Skip to content

Commit

Permalink
Merge pull request #24602 from andrwng/datalake-manifest-list-fix
Browse files Browse the repository at this point in the history
iceberg: fix spec inconsistency in manifest list files_count
  • Loading branch information
andrwng authored Dec 23, 2024
2 parents 1c2f495 + 59cc2c3 commit 8b6ec45
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 28 deletions.
9 changes: 6 additions & 3 deletions src/v/iceberg/avroschemas/manifest_file.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,22 @@
"field-id": 503
},
{
"name": "added_data_files_count",
"name": "added_files_count",
"aliases": ["added_data_files_count"],
"type": "int",
"doc": "Added entry count",
"field-id": 504
},
{
"name": "existing_data_files_count",
"name": "existing_files_count",
"aliases": ["existing_data_files_count"],
"type": "int",
"doc": "Existing entry count",
"field-id": 505
},
{
"name": "deleted_data_files_count",
"name": "deleted_files_count",
"aliases": ["deleted_data_files_count"],
"type": "int",
"doc": "Deleted entry count",
"field-id": 506
Expand Down
13 changes: 6 additions & 7 deletions src/v/iceberg/manifest_list_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ avrogen::manifest_file file_to_avro(const manifest_file& f) {
ret.min_sequence_number = f.min_seq_number();
ret.added_snapshot_id = f.added_snapshot_id();

ret.added_data_files_count = static_cast<int32_t>(f.added_files_count);
ret.existing_data_files_count = static_cast<int32_t>(
f.existing_files_count);
ret.deleted_data_files_count = static_cast<int32_t>(f.deleted_files_count);
ret.added_files_count = static_cast<int32_t>(f.added_files_count);
ret.existing_files_count = static_cast<int32_t>(f.existing_files_count);
ret.deleted_files_count = static_cast<int32_t>(f.deleted_files_count);

ret.added_rows_count = static_cast<int32_t>(f.added_rows_count);
ret.existing_rows_count = static_cast<int32_t>(f.existing_rows_count);
Expand Down Expand Up @@ -128,9 +127,9 @@ manifest_file file_from_avro(const avrogen::manifest_file& f) {
ret.min_seq_number = sequence_number{f.min_sequence_number};
ret.added_snapshot_id = snapshot_id{f.added_snapshot_id};

ret.added_files_count = f.added_data_files_count;
ret.existing_files_count = f.existing_data_files_count;
ret.deleted_files_count = f.deleted_data_files_count;
ret.added_files_count = f.added_files_count;
ret.existing_files_count = f.existing_files_count;
ret.deleted_files_count = f.deleted_files_count;

ret.added_rows_count = f.added_rows_count;
ret.existing_rows_count = f.existing_rows_count;
Expand Down
30 changes: 12 additions & 18 deletions src/v/iceberg/tests/manifest_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ TEST(ManifestSerializationTest, TestManifestFile) {
manifest.sequence_number = 3;
manifest.min_sequence_number = 4;
manifest.added_snapshot_id = 5;
manifest.added_data_files_count = 6;
manifest.existing_data_files_count = 7;
manifest.deleted_data_files_count = 8;
manifest.added_files_count = 6;
manifest.existing_files_count = 7;
manifest.deleted_files_count = 8;
manifest.added_rows_count = 9;
manifest.existing_rows_count = 10;
manifest.deleted_rows_count = 11;
Expand Down Expand Up @@ -198,12 +198,9 @@ TEST(ManifestSerializationTest, TestManifestFile) {
EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number);
EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number);
EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id);
EXPECT_EQ(
manifest.added_data_files_count, dmanifest.added_data_files_count);
EXPECT_EQ(
manifest.existing_data_files_count, dmanifest.existing_data_files_count);
EXPECT_EQ(
manifest.deleted_data_files_count, dmanifest.deleted_data_files_count);
EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count);
EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count);
EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count);
EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count);
EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count);
EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count);
Expand All @@ -218,9 +215,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
manifest.sequence_number = 3;
manifest.min_sequence_number = 4;
manifest.added_snapshot_id = 5;
manifest.added_data_files_count = 6;
manifest.existing_data_files_count = 7;
manifest.deleted_data_files_count = 8;
manifest.added_files_count = 6;
manifest.existing_files_count = 7;
manifest.deleted_files_count = 8;
manifest.added_rows_count = 9;
manifest.existing_rows_count = 10;
manifest.deleted_rows_count = 11;
Expand Down Expand Up @@ -264,12 +261,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number);
EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number);
EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id);
EXPECT_EQ(
manifest.added_data_files_count, dmanifest.added_data_files_count);
EXPECT_EQ(
manifest.existing_data_files_count, dmanifest.existing_data_files_count);
EXPECT_EQ(
manifest.deleted_data_files_count, dmanifest.deleted_data_files_count);
EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count);
EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count);
EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count);
EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count);
EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count);
EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count);
Expand Down
25 changes: 25 additions & 0 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ def test_avro_schema(self, cloud_storage_type, query_engine):
assert spark_describe_out == spark_expected_out, str(
spark_describe_out)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types())
def test_upload_after_external_update(self, cloud_storage_type):
table_name = f"redpanda.{self.topic_name}"
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=True,
include_query_engines=[QueryEngineType.SPARK
]) as dl:
count = 100
dl.create_iceberg_enabled_topic(self.topic_name, partitions=1)
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, count)
spark = dl.spark()
spark.make_client().cursor().execute(f"delete from {table_name}")
count_after_del = spark.count_table("redpanda", self.topic_name)
assert count_after_del == 0, f"{count_after_del} rows, expected 0"

dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation_until_offset(self.topic_name,
2 * count - 1)
count_after_produce = spark.count_table("redpanda",
self.topic_name)
assert count_after_produce == count, f"{count_after_produce} rows, expected {count}"

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[True, False])
Expand Down
69 changes: 69 additions & 0 deletions tests/rptest/tests/datalake/datalake_upgrade_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
from rptest.services.cluster import cluster

from rptest.services.redpanda import SISettings
from rptest.utils.mode_checks import skip_debug_mode
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.utils import supported_storage_types
from ducktape.mark import matrix


class DatalakeUpgradeTest(RedpandaTest):
def __init__(self, test_context):
super(DatalakeUpgradeTest,
self).__init__(test_context,
num_brokers=3,
si_settings=SISettings(test_context=test_context),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
})
self.test_ctx = test_context
self.topic_name = "upgrade_topic"

# Initial version that supported Iceberg.
self.initial_version = (24, 3)

def setUp(self):
self.redpanda._installer.install(self.redpanda.nodes,
self.initial_version)

@cluster(num_nodes=6)
@skip_debug_mode
@matrix(cloud_storage_type=supported_storage_types(),
query_engine=[QueryEngineType.SPARK])
def test_upload_through_upgrade(self, cloud_storage_type, query_engine):
"""
Test that Iceberg translation can progress through different versions
of Redpanda (e.g. ensuring that data format changes or additional
Iceberg fields don't block progress).
"""
total_count = 0
versions = self.load_version_range(self.initial_version)[1:]
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=True,
include_query_engines=[query_engine]) as dl:
dl.create_iceberg_enabled_topic(self.topic_name, partitions=10)

def run_workload():
nonlocal total_count
count = 100
dl.produce_to_topic(self.topic_name, 1024, msg_count=count)
total_count += count
dl.wait_for_translation(self.topic_name, msg_count=total_count)

versions = self.load_version_range(self.initial_version)
for v in self.upgrade_through_versions(versions_in=versions,
already_running=True):
self.logger.info(f"Updated to {v}")
run_workload()

0 comments on commit 8b6ec45

Please sign in to comment.