diff --git a/src/v/cloud_storage/tests/topic_manifest_test.cc b/src/v/cloud_storage/tests/topic_manifest_test.cc index 515a1c6a1afbf..746b08fbc177b 100644 --- a/src/v/cloud_storage/tests/topic_manifest_test.cc +++ b/src/v/cloud_storage/tests/topic_manifest_test.cc @@ -484,6 +484,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) { false, tristate{}, std::nullopt, + std::nullopt, }; auto random_initial_revision_id diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index c6f7e67607531..ea7ede1461513 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -42,7 +42,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "remote_label: {}, iceberg_mode: {}, " "leaders_preference: {}, " "delete_retention_ms: {}, " - "iceberg_delete: {}", + "iceberg_delete: {}, ", + "iceberg_partition_spec: {}", properties.compression, properties.cleanup_policy_bitflags, properties.compaction_strategy, @@ -79,7 +80,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.iceberg_mode, properties.leaders_preference, properties.delete_retention_ms, - properties.iceberg_delete); + properties.iceberg_delete, + properties.iceberg_partition_spec); if (config::shard_local_cfg().development_enable_cloud_topics()) { fmt::print( @@ -125,7 +127,7 @@ bool topic_properties::has_overrides() const { || flush_bytes.has_value() || remote_label.has_value() || (iceberg_mode != storage::ntp_config::default_iceberg_mode) || leaders_preference.has_value() || delete_retention_ms.is_engaged() - || iceberg_delete.has_value(); + || iceberg_delete.has_value() || iceberg_partition_spec.has_value(); if (config::shard_local_cfg().development_enable_cloud_topics()) { return overrides @@ -261,6 +263,7 @@ adl::from(iobuf_parser& parser) { false, tristate{disable_tristate}, std::nullopt, + std::nullopt, }; } diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index 93797c69ad494..a3ab7e739da49 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -33,7 +33,7 @@ namespace cluster { */ struct topic_properties : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_properties() noexcept = default; topic_properties( std::optional compression, @@ -76,7 +76,8 @@ struct topic_properties std::optional leaders_preference, bool cloud_topic_enabled, tristate delete_retention_ms, - std::optional iceberg_delete) + std::optional iceberg_delete, + std::optional iceberg_partition_spec) : compression(compression) , cleanup_policy_bitflags(cleanup_policy_bitflags) , compaction_strategy(compaction_strategy) @@ -118,7 +119,8 @@ struct topic_properties , leaders_preference(std::move(leaders_preference)) , cloud_topic_enabled(cloud_topic_enabled) , delete_retention_ms(delete_retention_ms) - , iceberg_delete(iceberg_delete) {} + , iceberg_delete(iceberg_delete) + , iceberg_partition_spec(std::move(iceberg_partition_spec)) {} std::optional compression; std::optional cleanup_policy_bitflags; @@ -194,6 +196,7 @@ struct topic_properties tristate delete_retention_ms{disable_tristate}; // Should we delete the corresponding iceberg table when deleting the topic. std::optional iceberg_delete; + std::optional iceberg_partition_spec; bool is_compacted() const; bool has_overrides() const; @@ -241,7 +244,8 @@ struct topic_properties leaders_preference, cloud_topic_enabled, delete_retention_ms, - iceberg_delete); + iceberg_delete, + iceberg_partition_spec); } friend bool operator==(const topic_properties&, const topic_properties&) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 462bfba208f61..e06a44055ec41 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -1101,6 +1101,9 @@ topic_properties topic_table::update_topic_properties( updated_properties.delete_retention_ms, overrides.delete_retention_ms); incremental_update( updated_properties.iceberg_delete, overrides.iceberg_delete); + incremental_update( + updated_properties.iceberg_partition_spec, + overrides.iceberg_partition_spec); return updated_properties; } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index f99c0df161268..49d6595c0135c 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -386,7 +386,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { "initial_retention_local_target_bytes: {}, " "initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, " "flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, " - "remote_read: {}, remote_write: {}, iceberg_delete: {}", + "remote_read: {}, remote_write: {}, iceberg_delete: {}, " + "iceberg_partition_spec: {}", i.compression, i.cleanup_policy_bitflags, i.compaction_strategy, @@ -417,7 +418,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.leaders_preference, i.remote_read, i.remote_write, - i.iceberg_delete); + i.iceberg_delete, + i.iceberg_partition_spec); return o; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index ef851afedc352..9684aac6889c3 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -568,7 +568,7 @@ struct property_update> struct incremental_topic_updates : serde::envelope< incremental_topic_updates, - serde::version<7>, + serde::version<8>, serde::compat_version<0>> { static constexpr int8_t version_with_data_policy = -1; static constexpr int8_t version_with_shadow_indexing = -3; @@ -641,6 +641,7 @@ struct incremental_topic_updates leaders_preference; property_update> delete_retention_ms; property_update> iceberg_delete; + property_update> iceberg_partition_spec; // To allow us to better control use of the deprecated shadow_indexing // field, use getters and setters instead. @@ -680,7 +681,8 @@ struct incremental_topic_updates remote_read, remote_write, delete_retention_ms, - iceberg_delete); + iceberg_delete, + iceberg_partition_spec); } friend std::ostream& diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index 08b598bf489e4..018296e45defa 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -655,6 +655,7 @@ struct instance_generator { std::nullopt, false, tristate{disable_tristate}, + std::nullopt, std::nullopt}; } diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index b9a1ab1809155..f4c9bd720274f 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3793,6 +3793,15 @@ configuration::configuration() "the topic.", {.needs_restart = needs_restart::no, .visibility = visibility::user}, true) + , iceberg_default_partition_spec( + *this, + "iceberg_default_partition_spec", + "Default value for the redpanda.iceberg.partition.spec topic property " + "that determines the partition spec for the Iceberg table corresponding " + "to the topic.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + "(hour(redpanda.timestamp))", + &validate_non_empty_string_opt) , development_enable_cloud_topics( *this, "development_enable_cloud_topics", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index cea4325929f85..241a0c8f5ccbd 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -719,6 +719,7 @@ struct configuration final : public config_store { property> iceberg_rest_catalog_prefix; property iceberg_delete; + property iceberg_default_partition_spec; configuration(); diff --git a/src/v/datalake/BUILD b/src/v/datalake/BUILD index ca4ae9fc3133f..39ecf3c34fe06 100644 --- a/src/v/datalake/BUILD +++ b/src/v/datalake/BUILD @@ -235,13 +235,9 @@ redpanda_cc_library( hdrs = [ "table_definition.h", ], - implementation_deps = [ - "//src/v/iceberg:transform", - ], include_prefix = "datalake", visibility = [":__subpackages__"], deps = [ - "//src/v/iceberg:partition", "//src/v/iceberg:schema", ], ) diff --git a/src/v/datalake/coordinator/BUILD b/src/v/datalake/coordinator/BUILD index 8f21cfda4030a..31058ee06de42 100644 --- a/src/v/datalake/coordinator/BUILD +++ b/src/v/datalake/coordinator/BUILD @@ -43,7 +43,6 @@ redpanda_cc_library( "//src/v/datalake:record_schema_resolver", "//src/v/datalake:record_translator", "//src/v/datalake:schema_identifier", - "//src/v/datalake:table_definition", "//src/v/datalake:table_id_provider", "//src/v/datalake:types", "//src/v/ssx:future_util", diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 333d10ca88a8f..1997602534cbe 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -16,7 +16,6 @@ #include "datalake/coordinator/state_update.h" #include "datalake/logger.h" #include "datalake/record_translator.h" -#include "datalake/table_definition.h" #include "datalake/table_id_provider.h" #include "model/fundamental.h" #include "ssx/future-util.h" @@ -229,11 +228,25 @@ checked coordinator::maybe_gate() { return gate_.hold(); } +struct coordinator::table_schema_provider { + virtual iceberg::table_identifier get_id(const model::topic&) const = 0; + + virtual ss::future> + get_record_type(record_schema_components) const = 0; + + virtual ss::sstring get_partition_spec(const cluster::topic_metadata&) const + = 0; + + virtual ~table_schema_provider() = default; +}; + ss::future> -coordinator::sync_ensure_table_exists( +coordinator::do_ensure_table_exists( model::topic topic, model::revision_id topic_revision, - record_schema_components comps) { + record_schema_components comps, + std::string_view method_name, + const table_schema_provider& schema_provider) { auto gate = maybe_gate(); if (gate.has_error()) { co_return gate.error(); @@ -241,7 +254,8 @@ coordinator::sync_ensure_table_exists( vlog( datalake_log.debug, - "Sync ensure table exists requested, topic: {} rev: {}", + "{} requested, topic: {} rev: {}", + method_name, topic, topic_revision); @@ -252,6 +266,35 @@ coordinator::sync_ensure_table_exists( // TODO: add mutex to protect against the thundering herd problem + auto topic_md = topic_table_.get_topic_metadata_ref( + model::topic_namespace_view{model::kafka_namespace, topic}); + if (!topic_md || topic_md->get().get_revision() != topic_revision) { + vlog( + datalake_log.debug, + "Rejecting {} for {} rev {}, topic table revision {}", + method_name, + topic, + topic_revision, + topic_md ? std::optional{topic_md->get().get_revision()} + : std::nullopt); + co_return errc::revision_mismatch; + } + + auto partition_spec_str = schema_provider.get_partition_spec( + topic_md->get()); + auto partition_spec = iceberg::unresolved_partition_spec::parse( + partition_spec_str); + if (!partition_spec.has_value()) { + vlog( + datalake_log.warn, + "{} failed, couldn't parse partition spec for {} rev {}: {}", + method_name, + topic, + topic_revision, + partition_spec.error()); + co_return errc::failed; + } + topic_lifecycle_update update{ .topic = topic, .revision = topic_revision, @@ -261,15 +304,14 @@ coordinator::sync_ensure_table_exists( if (check_res.has_error()) { vlog( datalake_log.debug, - "Rejecting ensure_table_exist for {} rev {}: {}", + "Rejecting {} for {} rev {}: {}", + method_name, topic, topic_revision, check_res.error()); co_return errc::revision_mismatch; } - // Will skip the STM update if the topic is already live. - // But will still ensure the DLQ table schema. if (check_res.value()) { // update is non-trivial storage::record_batch_builder builder( @@ -286,21 +328,16 @@ coordinator::sync_ensure_table_exists( // TODO: verify stm state after replication - auto table_id = table_id_provider::table_id(topic); + auto table_id = schema_provider.get_id(topic); - std::optional val_type; - if (comps.val_identifier) { - auto type_res = co_await type_resolver_.resolve_identifier( - comps.val_identifier.value()); - if (type_res.has_error()) { - co_return errc::failed; - } - val_type = std::move(type_res.value()); + auto record_type = co_await schema_provider.get_record_type( + std::move(comps)); + if (!record_type.has_value()) { + co_return errc::failed; } - auto record_type = default_translator{}.build_type(std::move(val_type)); auto ensure_res = co_await schema_mgr_.ensure_table_schema( - table_id, record_type.type, hour_partition_spec()); + table_id, record_type.value(), partition_spec.value()); if (ensure_res.has_error()) { switch (ensure_res.error()) { case schema_manager::errc::not_supported: @@ -315,77 +352,84 @@ coordinator::sync_ensure_table_exists( co_return std::nullopt; } -ss::future> -coordinator::sync_ensure_dlq_table_exists( - model::topic topic, model::revision_id topic_revision) { - auto gate = maybe_gate(); - if (gate.has_error()) { - co_return gate.error(); +struct coordinator::main_table_schema_provider + : public coordinator::table_schema_provider { + explicit main_table_schema_provider(coordinator& parent) + : parent(parent) {} + + iceberg::table_identifier get_id(const model::topic& topic) const final { + return table_id_provider::table_id(topic); } - vlog( - datalake_log.debug, - "Sync ensure dlq table exists requested, topic: {} rev: {}", - topic, - topic_revision); + ss::future> + get_record_type(record_schema_components comps) const final { + std::optional val_type; + if (comps.val_identifier) { + auto type_res = co_await parent.type_resolver_.resolve_identifier( + comps.val_identifier.value()); + if (type_res.has_error()) { + co_return errc::failed; + } + val_type = std::move(type_res.value()); + } - auto sync_res = co_await stm_->sync(10s); - if (sync_res.has_error()) { - co_return convert_stm_errc(sync_res.error()); + auto record_type = default_translator{}.build_type(std::move(val_type)); + co_return std::move(record_type.type); } - // TODO: add mutex to protect against the thundering herd problem - - topic_lifecycle_update update{ - .topic = topic, - .revision = topic_revision, - .new_state = topic_state::lifecycle_state_t::live, - }; - auto check_res = update.can_apply(stm_->state()); - if (check_res.has_error()) { - vlog( - datalake_log.debug, - "Rejecting ensure_dlq_table_exist for {} rev {}: {}", - topic, - topic_revision, - check_res.error()); - co_return errc::revision_mismatch; + ss::sstring + get_partition_spec(const cluster::topic_metadata& topic_md) const final { + return topic_md.get_configuration() + .properties.iceberg_partition_spec.value_or( + parent.default_partition_spec_()); } - // Will skip the STM update if the topic is already live. - // But will still ensure the DLQ table schema. - if (check_res.value()) { - // update is non-trivial - storage::record_batch_builder builder( - model::record_batch_type::datalake_coordinator, model::offset{0}); - builder.add_raw_kv( - serde::to_iobuf(topic_lifecycle_update::key), - serde::to_iobuf(std::move(update))); - auto repl_res = co_await stm_->replicate_and_wait( - sync_res.value(), std::move(builder).build(), as_); - if (repl_res.has_error()) { - co_return convert_stm_errc(repl_res.error()); - } + const coordinator& parent; +}; + +ss::future> +coordinator::sync_ensure_table_exists( + model::topic topic, + model::revision_id topic_revision, + record_schema_components comps) { + co_return co_await do_ensure_table_exists( + topic, + topic_revision, + std::move(comps), + "sync_ensure_table_exists", + main_table_schema_provider{*this}); +} + +struct coordinator::dlq_table_schema_provider + : public coordinator::table_schema_provider { + explicit dlq_table_schema_provider(coordinator& parent) + : parent(parent) {} + + iceberg::table_identifier get_id(const model::topic& topic) const final { + return table_id_provider::dlq_table_id(topic); } - // TODO: verify stm state after replication + ss::future> + get_record_type(record_schema_components) const final { + co_return key_value_translator{}.build_type(std::nullopt).type; + } - auto dlq_table_id = table_id_provider::dlq_table_id(topic); - auto record_type = key_value_translator{}.build_type(std::nullopt); - auto ensure_res = co_await schema_mgr_.ensure_table_schema( - dlq_table_id, record_type.type, hour_partition_spec()); - if (ensure_res.has_error()) { - switch (ensure_res.error()) { - case schema_manager::errc::not_supported: - co_return errc::incompatible_schema; - case schema_manager::errc::failed: - co_return errc::failed; - case schema_manager::errc::shutting_down: - co_return errc::shutting_down; - } + ss::sstring get_partition_spec(const cluster::topic_metadata&) const final { + return parent.default_partition_spec_(); } - co_return std::nullopt; + const coordinator& parent; +}; + +ss::future> +coordinator::sync_ensure_dlq_table_exists( + model::topic topic, model::revision_id topic_revision) { + co_return co_await do_ensure_table_exists( + topic, + topic_revision, + record_schema_components{}, + "sync_ensure_dlq_table_exists", + dlq_table_schema_provider{*this}); } ss::future> diff --git a/src/v/datalake/coordinator/coordinator.h b/src/v/datalake/coordinator/coordinator.h index 09a826bbd57f2..b3bae41c14624 100644 --- a/src/v/datalake/coordinator/coordinator.h +++ b/src/v/datalake/coordinator/coordinator.h @@ -45,14 +45,16 @@ class coordinator { schema_manager& schema_mgr, remove_tombstone_f remove_tombstone, file_committer& file_committer, - config::binding commit_interval) + config::binding commit_interval, + config::binding default_partition_spec) : stm_(std::move(stm)) , topic_table_(topics) , type_resolver_(type_resolver) , schema_mgr_(schema_mgr) , remove_tombstone_(std::move(remove_tombstone)) , file_committer_(file_committer) - , commit_interval_(std::move(commit_interval)) {} + , commit_interval_(std::move(commit_interval)) + , default_partition_spec_(std::move(default_partition_spec)) {} void start(); ss::future<> stop_and_wait(); @@ -98,6 +100,16 @@ class coordinator { ss::future> update_lifecycle_state(const model::topic&, model::term_id); + struct table_schema_provider; + struct main_table_schema_provider; + struct dlq_table_schema_provider; + ss::future> do_ensure_table_exists( + model::topic, + model::revision_id topic_revision, + record_schema_components, + std::string_view method_name, + const table_schema_provider&); + ss::shared_ptr stm_; cluster::topic_table& topic_table_; type_resolver& type_resolver_; @@ -105,6 +117,7 @@ class coordinator { remove_tombstone_f remove_tombstone_; file_committer& file_committer_; config::binding commit_interval_; + config::binding default_partition_spec_; ss::gate gate_; ss::abort_source as_; diff --git a/src/v/datalake/coordinator/coordinator_manager.cc b/src/v/datalake/coordinator/coordinator_manager.cc index 26a76db26cb35..e020e14e55368 100644 --- a/src/v/datalake/coordinator/coordinator_manager.cc +++ b/src/v/datalake/coordinator/coordinator_manager.cc @@ -123,7 +123,8 @@ void coordinator_manager::start_managing(cluster::partition& p) { return remove_tombstone(t, rev); }, *file_committer_, - config::shard_local_cfg().iceberg_catalog_commit_interval_ms.bind()); + config::shard_local_cfg().iceberg_catalog_commit_interval_ms.bind(), + config::shard_local_cfg().iceberg_default_partition_spec.bind()); if (p.is_leader()) { crd->notify_leadership(self_); } diff --git a/src/v/datalake/coordinator/tests/BUILD b/src/v/datalake/coordinator/tests/BUILD index dc883af5258c7..63af8fc70dee1 100644 --- a/src/v/datalake/coordinator/tests/BUILD +++ b/src/v/datalake/coordinator/tests/BUILD @@ -66,6 +66,7 @@ redpanda_cc_gtest( "//src/v/datalake:logger", "//src/v/datalake:table_definition", "//src/v/datalake/coordinator:iceberg_file_committer", + "//src/v/datalake/tests:test_utils", "//src/v/iceberg:filesystem_catalog", "//src/v/iceberg:manifest_entry", "//src/v/iceberg:manifest_io", diff --git a/src/v/datalake/coordinator/tests/CMakeLists.txt b/src/v/datalake/coordinator/tests/CMakeLists.txt index d72700bddf02f..85568a05a5c54 100644 --- a/src/v/datalake/coordinator/tests/CMakeLists.txt +++ b/src/v/datalake/coordinator/tests/CMakeLists.txt @@ -37,6 +37,7 @@ rp_test( v::cloud_io_utils v::datalake_coordinator v::datalake_coordinator_test_utils + v::datalake_test_utils v::gtest_main v::iceberg v::raft_fixture diff --git a/src/v/datalake/coordinator/tests/coordinator_test.cc b/src/v/datalake/coordinator/tests/coordinator_test.cc index 3a17ca6fe45ef..796ff313a48e9 100644 --- a/src/v/datalake/coordinator/tests/coordinator_test.cc +++ b/src/v/datalake/coordinator/tests/coordinator_test.cc @@ -73,7 +73,8 @@ struct coordinator_node { return remove_tombstone(t, r); }, *file_committer, - commit_interval_ms.bind()) {} + commit_interval_ms.bind(), + default_partition_spec.bind()) {} ss::future> remove_tombstone(const model::topic&, model::revision_id) { @@ -90,6 +91,8 @@ struct coordinator_node { coordinator_stm& stm; config::mock_property commit_interval_ms; + config::mock_property default_partition_spec{ + "(hour(redpanda.timestamp))"}; cluster::data_migrations::migrated_resources mr; cluster::topic_table topic_table; datalake::binary_type_resolver type_resolver; @@ -238,6 +241,23 @@ class CoordinatorTest : public raft::raft_fixture { raft::raft_fixture::TearDownAsync().get(); } + void register_in_topic_tables( + const model::topic& topic, model::revision_id rev) { + auto topic_cfg = cluster::topic_configuration( + model::kafka_namespace, topic, 1, 1); + for (auto& crd : crds) { + auto tt_res = crd->topic_table + .apply( + cluster::create_topic_cmd{ + model::topic_namespace{ + model::kafka_namespace, topic}, + {topic_cfg, {}}}, + model::offset{rev()}) + .get(); + ASSERT_EQ(tt_res, cluster::errc::success); + } + } + // Returns the coordinator on the current leader. using opt_ref = std::optional>; opt_ref leader_node() { @@ -329,8 +349,10 @@ TEST_F(CoordinatorTest, TestAddFilesHappyPath) { const auto tp00 = tp(0, 0); const auto tp01 = tp(0, 1); const model::revision_id rev0{1}; + register_in_topic_tables(tp00.topic, rev0); const auto tp10 = tp(1, 0); const model::revision_id rev1{2}; + register_in_topic_tables(tp10.topic, rev1); leader.ensure_table(tp00.topic, rev0); pairs_t total_expected_00; @@ -397,6 +419,7 @@ TEST_F(CoordinatorTest, TestLastAddedHappyPath) { const auto tp00 = tp(0, 0); const auto tp01 = tp(0, 1); const model::revision_id rev{1}; + register_in_topic_tables(tp00.topic, rev); leader.ensure_table(tp00.topic, rev); pairs_t total_expected_00; for (const auto& v : @@ -430,6 +453,7 @@ TEST_F(CoordinatorTest, TestNotLeader) { auto& non_leader = non_leader_opt->get(); const auto tp00 = tp(0, 0); const model::revision_id rev{1}; + register_in_topic_tables(tp00.topic, rev); leader_opt.value().get().ensure_table(tp00.topic, rev); pairs_t total_expected_00; @@ -458,6 +482,7 @@ TEST_P(CoordinatorTestWithParams, TestConcurrentAddFiles) { } const auto tp00 = tp(0, 0); const model::revision_id rev0{1}; + register_in_topic_tables(tp00.topic, rev0); bool done = false; std::vector> adders; int fiber_id = 0; @@ -566,6 +591,7 @@ TEST_F(CoordinatorLoopTest, TestCommitFilesHappyPath) { auto& leader = leader_opt->get(); const auto tp00 = tp(0, 0); const model::revision_id rev0{1}; + register_in_topic_tables(tp00.topic, rev0); leader.ensure_table(tp00.topic, rev0); auto add_res = leader.crd .sync_add_files(tp00, rev0, make_pending_files({{0, 100}})) @@ -596,6 +622,7 @@ TEST_F(CoordinatorLoopTest, TestCommitFilesNotLeader) { auto& leader = leader_opt->get(); const auto tp00 = tp(0, 0); const model::revision_id rev0{1}; + register_in_topic_tables(tp00.topic, rev0); leader.ensure_table(tp00.topic, rev0); auto add_res = leader.crd .sync_add_files(tp00, rev0, make_pending_files({{0, 100}})) @@ -677,6 +704,7 @@ TEST_F(CoordinatorSleepingLoopTest, TestQuickShutdownOnLeadershipChange) { for (int i = 0; i < 100; i++) { auto t = tp(i, 0); auto rev = model::revision_id{i}; + register_in_topic_tables(t.topic, rev); leader.ensure_table(t.topic, rev); auto add_res = leader.crd .sync_add_files(t, rev, make_pending_files({{0, 100}})) diff --git a/src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc b/src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc index f2bbed30581f3..a19c26f42fa35 100644 --- a/src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc +++ b/src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc @@ -14,6 +14,7 @@ #include "datalake/coordinator/iceberg_file_committer.h" #include "datalake/coordinator/tests/state_test_utils.h" #include "datalake/table_definition.h" +#include "datalake/tests/test_utils.h" #include "iceberg/filesystem_catalog.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_io.h" diff --git a/src/v/datalake/coordinator/tests/state_machine_test.cc b/src/v/datalake/coordinator/tests/state_machine_test.cc index 88de4b7b9d8ee..cc9eb2783e530 100644 --- a/src/v/datalake/coordinator/tests/state_machine_test.cc +++ b/src/v/datalake/coordinator/tests/state_machine_test.cc @@ -37,6 +37,10 @@ struct coordinator_stm_fixture : stm_raft_fixture { return config::mock_binding(1s); } + config::binding default_partition_spec() const { + return config::mock_binding("(hour(redpanda.timestamp))"); + } + stm_shptrs_t create_stms( state_machine_manager_builder& builder, raft_node_instance& node) override { @@ -58,7 +62,8 @@ struct coordinator_stm_fixture : stm_raft_fixture { return remove_tombstone(t, r); }, file_committer, - commit_interval()); + commit_interval(), + default_partition_spec()); coordinators[node.get_vnode()]->start(); return ss::now(); }); @@ -122,11 +127,20 @@ struct coordinator_stm_fixture : stm_raft_fixture { model::topic_partition random_tp() const { return { - model::topic{"test"}, + tp_ns.tp, model::partition_id( random_generators::get_int(0, max_partitions - 1))}; } + ss::future<> register_in_topic_table() { + auto topic_cfg = cluster::topic_configuration( + tp_ns.ns, tp_ns.tp, /*partition_count=*/1, /*replication_factor=*/1); + auto tt_res = co_await topic_table.apply( + cluster::create_topic_cmd{tp_ns, {topic_cfg, {}}}, + model::offset{rev()}); + ASSERT_EQ_CORO(tt_res, cluster::errc::success); + } + ss::future< checked> remove_tombstone(const model::topic&, model::revision_id) { @@ -134,7 +148,8 @@ struct coordinator_stm_fixture : stm_raft_fixture { } static constexpr int32_t max_partitions = 5; - model::topic_partition tp{model::topic{"test"}, model::partition_id{0}}; + model::topic_namespace tp_ns{model::kafka_namespace, model::topic{"test"}}; + model::topic_partition tp{tp_ns.tp, model::partition_id{0}}; model::revision_id rev{123}; cluster::data_migrations::migrated_resources mr; cluster::topic_table topic_table{mr}; @@ -147,6 +162,7 @@ struct coordinator_stm_fixture : stm_raft_fixture { TEST_F_CORO(coordinator_stm_fixture, test_snapshots) { co_await initialize(); co_await wait_for_leader(5s); + co_await register_in_topic_table(); // populate some data until the state machine is snapshotted // a few times @@ -224,7 +240,7 @@ TEST_F_CORO(coordinator_stm_fixture, test_snapshots) { for (int32_t pid = 0; pid < max_partitions; pid++) { auto committed_offsets = last_committed_offsets( - {model::topic{"test"}, model::partition_id{pid}}); + {tp_ns.tp, model::partition_id{pid}}); vlog(logger().info, "committed offsets: {}", committed_offsets); ASSERT_TRUE_CORO(std::equal( committed_offsets.begin() + 1, diff --git a/src/v/datalake/table_definition.cc b/src/v/datalake/table_definition.cc index 634de8a649ef2..d2fab622a0f4c 100644 --- a/src/v/datalake/table_definition.cc +++ b/src/v/datalake/table_definition.cc @@ -9,8 +9,6 @@ */ #include "datalake/table_definition.h" -#include "iceberg/transform.h" - namespace datalake { using namespace iceberg; struct_type schemaless_struct_type() { @@ -54,16 +52,4 @@ schema default_schema() { }; } -unresolved_partition_spec hour_partition_spec() { - chunked_vector fields; - fields.push_back({ - .source_name = {"redpanda", "timestamp"}, - .transform = hour_transform{}, - .name = "redpanda_timestamp_hour", - }); - return { - .fields = std::move(fields), - }; -} - } // namespace datalake diff --git a/src/v/datalake/table_definition.h b/src/v/datalake/table_definition.h index 4c0c365e8e503..6eb8f70791f01 100644 --- a/src/v/datalake/table_definition.h +++ b/src/v/datalake/table_definition.h @@ -9,7 +9,6 @@ */ #pragma once -#include "iceberg/partition.h" #include "iceberg/schema.h" namespace datalake { @@ -22,7 +21,4 @@ iceberg::struct_type schemaless_struct_type(); iceberg::schema default_schema(); inline constexpr std::string_view rp_struct_name = "redpanda"; -// Hourly partitioning on the timestamp of a schema with the above fields. -iceberg::unresolved_partition_spec hour_partition_spec(); - } // namespace datalake diff --git a/src/v/datalake/tests/BUILD b/src/v/datalake/tests/BUILD index 2c8030dbdd6e7..921ae0e49f2c1 100644 --- a/src/v/datalake/tests/BUILD +++ b/src/v/datalake/tests/BUILD @@ -152,7 +152,7 @@ redpanda_test_cc_library( "//src/v/datalake:table_id_provider", ], include_prefix = "datalake/tests", - visibility = [":__subpackages__"], + visibility = ["//visibility:public"], deps = [ "//src/v/datalake:catalog_schema_manager", "//src/v/datalake:record_schema_resolver", @@ -217,6 +217,7 @@ redpanda_cc_gtest( ], deps = [ ":test_data_writer", + ":test_utils", "//src/v/bytes", "//src/v/datalake:partitioning_writer", "//src/v/datalake:table_definition", diff --git a/src/v/datalake/tests/CMakeLists.txt b/src/v/datalake/tests/CMakeLists.txt index e94963febef66..cc0d683d20a65 100644 --- a/src/v/datalake/tests/CMakeLists.txt +++ b/src/v/datalake/tests/CMakeLists.txt @@ -50,6 +50,7 @@ rp_test( v::bytes v::utils v::gtest_main + v::datalake_test_utils v::datalake_writer v::iceberg_test_utils v::schema diff --git a/src/v/datalake/tests/partitioning_writer_test.cc b/src/v/datalake/tests/partitioning_writer_test.cc index 3b4c38216a029..dd01109cf5eed 100644 --- a/src/v/datalake/tests/partitioning_writer_test.cc +++ b/src/v/datalake/tests/partitioning_writer_test.cc @@ -11,6 +11,7 @@ #include "datalake/partitioning_writer.h" #include "datalake/table_definition.h" #include "datalake/tests/test_data_writer.h" +#include "datalake/tests/test_utils.h" #include "iceberg/tests/test_schemas.h" #include "iceberg/tests/value_generator.h" #include "model/timestamp.h" diff --git a/src/v/datalake/tests/test_utils.cc b/src/v/datalake/tests/test_utils.cc index ef620bab0318e..a2e17228af0fc 100644 --- a/src/v/datalake/tests/test_utils.cc +++ b/src/v/datalake/tests/test_utils.cc @@ -11,13 +11,24 @@ #include "datalake/tests/test_utils.h" #include "datalake/record_translator.h" -#include "datalake/table_definition.h" #include "datalake/table_id_provider.h" #include namespace datalake { +iceberg::unresolved_partition_spec hour_partition_spec() { + chunked_vector fields; + fields.push_back({ + .source_name = {"redpanda", "timestamp"}, + .transform = iceberg::hour_transform{}, + .name = "redpanda.timestamp_hour", + }); + return { + .fields = std::move(fields), + }; +} + direct_table_creator::direct_table_creator( type_resolver& tr, schema_manager& sm) : type_resolver_(tr) diff --git a/src/v/datalake/tests/test_utils.h b/src/v/datalake/tests/test_utils.h index f0339d26d61b0..6d73b38bcf2bd 100644 --- a/src/v/datalake/tests/test_utils.h +++ b/src/v/datalake/tests/test_utils.h @@ -15,6 +15,9 @@ namespace datalake { +// Hourly partitioning on the redpanda.timestamp field. +iceberg::unresolved_partition_spec hour_partition_spec(); + // Creates or alters the table by interfacing directly with a catalog. class direct_table_creator : public table_creator { public: diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index c782573dd31ef..2979804f535b9 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -513,6 +513,7 @@ redpanda_cc_library( deps = [ ":datatypes", ":transform", + ":unresolved_partition_spec", "//src/v/container:fragmented_vector", "//src/v/utils:named_type", "@seastar", @@ -953,6 +954,7 @@ redpanda_cc_library( include_prefix = "iceberg", deps = [ ":values", + "@seastar", ], ) @@ -1036,6 +1038,24 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "unresolved_partition_spec", + srcs = [ + "unresolved_partition_spec.cc", + ], + hdrs = [ + "unresolved_partition_spec.h", + ], + include_prefix = "iceberg", + visibility = ["//visibility:public"], + deps = [ + ":transform", + "//src/v/base", + "//src/v/container:fragmented_vector", + "@seastar", + ], +) + redpanda_cc_library( name = "update_schema_action", srcs = [ diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 957b0da1c9292..6fa6df695d184 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -24,6 +24,17 @@ foreach (schema ${schemas}) list(APPEND avro_hdrs ${avro_hdr}) endforeach() +v_cc_library( + NAME iceberg_unresolved_partition_spec + SRCS + transform.cc + unresolved_partition_spec.cc + DEPS + Seastar::seastar + v::base + v::container +) + v_cc_library( NAME iceberg SRCS @@ -67,7 +78,6 @@ v_cc_library( time_transform_visitor.cc table_identifier.cc transaction.cc - transform.cc transform_json.cc transform_utils.cc update_schema_action.cc @@ -86,6 +96,7 @@ v_cc_library( v::bytes v::cloud_io v::container + v::iceberg_unresolved_partition_spec v::json v::strings v::utils diff --git a/src/v/iceberg/partition.cc b/src/v/iceberg/partition.cc index 2a6ca71032230..1b7ba1a84e99a 100644 --- a/src/v/iceberg/partition.cc +++ b/src/v/iceberg/partition.cc @@ -11,22 +11,6 @@ namespace iceberg { -std::ostream& -operator<<(std::ostream& o, const unresolved_partition_spec::field& f) { - fmt::print( - o, - "{{source_name: {}, transform: {}, name: {}}}", - f.source_name, - f.transform, - f.name); - return o; -} - -std::ostream& operator<<(std::ostream& o, const unresolved_partition_spec& ps) { - fmt::print(o, "{{fields: {}}}", ps.fields); - return o; -} - std::ostream& operator<<(std::ostream& o, const partition_field& f) { fmt::print( o, diff --git a/src/v/iceberg/partition.h b/src/v/iceberg/partition.h index cf1ccad58f0e9..9a814ad315f8e 100644 --- a/src/v/iceberg/partition.h +++ b/src/v/iceberg/partition.h @@ -11,29 +11,13 @@ #include "container/fragmented_vector.h" #include "iceberg/datatypes.h" #include "iceberg/transform.h" +#include "iceberg/unresolved_partition_spec.h" #include "utils/named_type.h" #include namespace iceberg { -struct unresolved_partition_spec { - struct field { - // Components of the nested source field name, in increasing depth - // order. - std::vector source_name; - transform transform; - ss::sstring name; - - friend std::ostream& operator<<(std::ostream&, const field&); - }; - - chunked_vector fields; - - friend std::ostream& - operator<<(std::ostream&, const unresolved_partition_spec&); -}; - struct partition_field { using id_t = named_type; nested_field::id_t source_id; diff --git a/src/v/iceberg/rest_client/catalog_client.cc b/src/v/iceberg/rest_client/catalog_client.cc index 668ff484ec84f..5cbbeacd561d2 100644 --- a/src/v/iceberg/rest_client/catalog_client.cc +++ b/src/v/iceberg/rest_client/catalog_client.cc @@ -14,6 +14,7 @@ #include "http/request_builder.h" #include "http/utils.h" #include "iceberg/json_writer.h" +#include "iceberg/logger.h" #include "iceberg/rest_client/entities.h" #include "iceberg/rest_client/json.h" #include "iceberg/table_requests_json.h" @@ -216,6 +217,13 @@ ss::future> catalog_client::create_table( .with_bearer_auth(token.value()) .with_content_type(json_content_type); + auto json_buf = serialize_payload_as_json(req); + ss::sstring json_str; + for (const auto& frag : json_buf) { + json_str.append(frag.get(), frag.size()); + } + vlog(log.info, "FFF {}", json_str); + co_return (co_await perform_request( rtc, http_request, serialize_payload_as_json(req))) .and_then(parse_json) diff --git a/src/v/iceberg/tests/partition_test.cc b/src/v/iceberg/tests/partition_test.cc index 51e31ef7c5c50..cadba38520ac5 100644 --- a/src/v/iceberg/tests/partition_test.cc +++ b/src/v/iceberg/tests/partition_test.cc @@ -234,3 +234,62 @@ TEST(PartitionTest, TestSpecResolve) { ASSERT_EQ(resolved, std::nullopt); } } + +TEST(PartitionTest, TestSpecParse) { + { + auto res = unresolved_partition_spec::parse("()"); + ASSERT_FALSE(res.has_error()) << res.error(); + ASSERT_EQ(res.value(), unresolved_partition_spec{}); + } + + { + auto res = unresolved_partition_spec::parse("(foo)"); + ASSERT_FALSE(res.has_error()) << res.error(); + auto expected = chunked_vector{ + unresolved_partition_spec::field{ + .source_name = {"foo"}, + .transform = identity_transform{}, + .name = "foo"}, + }; + ASSERT_EQ( + res.value(), + unresolved_partition_spec{.fields = std::move(expected)}); + } + + { + auto res = unresolved_partition_spec::parse(" (foo.bar, baz ) "); + ASSERT_FALSE(res.has_error()) << res.error(); + auto expected = chunked_vector{ + unresolved_partition_spec::field{ + .source_name = {"foo", "bar"}, + .transform = identity_transform{}, + .name = "foo.bar"}, + unresolved_partition_spec::field{ + .source_name = {"baz"}, + .transform = identity_transform{}, + .name = "baz"}, + }; + ASSERT_EQ( + res.value(), + unresolved_partition_spec{.fields = std::move(expected)}); + } + + { + auto res = unresolved_partition_spec::parse( + " (hour(redpanda.timestamp), day(my_ts) as my_day )"); + ASSERT_FALSE(res.has_error()) << res.error(); + auto expected = chunked_vector{ + unresolved_partition_spec::field{ + .source_name = {"redpanda", "timestamp"}, + .transform = hour_transform{}, + .name = "redpanda.timestamp_hour"}, + unresolved_partition_spec::field{ + .source_name = {"my_ts"}, + .transform = day_transform{}, + .name = "my_day"}, + }; + ASSERT_EQ( + res.value(), + unresolved_partition_spec{.fields = std::move(expected)}); + } +} diff --git a/src/v/iceberg/time_transform_visitor.cc b/src/v/iceberg/time_transform_visitor.cc index c6ad8c1aba3ba..286c224e7e56f 100644 --- a/src/v/iceberg/time_transform_visitor.cc +++ b/src/v/iceberg/time_transform_visitor.cc @@ -10,15 +10,26 @@ #include "iceberg/values.h" +#include + namespace iceberg { namespace { + +static constexpr int64_t micros_per_s = 1'000'000; + int32_t micros_to_hr(int64_t micros) { static constexpr int64_t s_per_hr = 3600; - static constexpr int64_t micros_per_s = 1000000; static constexpr int64_t micros_per_hr = micros_per_s * s_per_hr; return static_cast(micros / micros_per_hr); } + +int32_t micros_to_day(int64_t micros) { + static constexpr int64_t s_per_day = 86400; + static constexpr int64_t micros_per_day = micros_per_s * s_per_day; + return static_cast(micros / micros_per_day); +} + } // namespace int32_t hour_transform_visitor::operator()(const primitive_value& v) { @@ -35,4 +46,15 @@ int32_t hour_transform_visitor::operator()(const primitive_value& v) { fmt::format("hourly_visitor not implemented for primitive value {}", v)); } +int32_t day_transform_visitor::operator()(const primitive_value& v) { + return ss::visit( + v, + [](const timestamp_value& v) { return micros_to_day(v.val); }, + [](const timestamptz_value& v) { return micros_to_day(v.val); }, + [](const auto& v) -> int32_t { + throw std::invalid_argument(fmt::format( + "day_transform_visitor not implemented for primitive value {}", v)); + }); +} + } // namespace iceberg diff --git a/src/v/iceberg/time_transform_visitor.h b/src/v/iceberg/time_transform_visitor.h index 749027a326d1f..b167d00cde87e 100644 --- a/src/v/iceberg/time_transform_visitor.h +++ b/src/v/iceberg/time_transform_visitor.h @@ -22,4 +22,14 @@ struct hour_transform_visitor { } }; +struct day_transform_visitor { + int32_t operator()(const primitive_value& v); + + template + int32_t operator()(const T& t) { + throw std::invalid_argument( + fmt::format("day_transform_visitor not implemented for value {}", t)); + } +}; + } // namespace iceberg diff --git a/src/v/iceberg/transform_utils.cc b/src/v/iceberg/transform_utils.cc index 84af4a4826d1e..45bf00685c80e 100644 --- a/src/v/iceberg/transform_utils.cc +++ b/src/v/iceberg/transform_utils.cc @@ -33,6 +33,11 @@ struct transform_applying_visitor { return v; } + value operator()(const day_transform&) { + int_value v{std::visit(day_transform_visitor{}, source_val_)}; + return v; + } + template value operator()(const T&) { throw std::invalid_argument( diff --git a/src/v/iceberg/unresolved_partition_spec.cc b/src/v/iceberg/unresolved_partition_spec.cc new file mode 100644 index 0000000000000..98eefceefcc88 --- /dev/null +++ b/src/v/iceberg/unresolved_partition_spec.cc @@ -0,0 +1,292 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/unresolved_partition_spec.h" + +namespace iceberg { + +std::ostream& +operator<<(std::ostream& o, const unresolved_partition_spec::field& f) { + fmt::print( + o, + "{{source_name: {}, transform: {}, name: {}}}", + f.source_name, + f.transform, + f.name); + return o; +} + +std::ostream& operator<<(std::ostream& o, const unresolved_partition_spec& ps) { + fmt::print(o, "{{fields: {}}}", ps.fields); + return o; +} + +namespace { + +template +struct parse_result { + T val; + std::string_view unparsed; +}; + +struct parse_ctx { + std::string_view original; + ss::sstring last_error; + + void report_expected(std::string_view unparsed, std::string_view expected) { + last_error = fmt::format( + "col {}: expected {} (got instead: `{}')", + unparsed.data() - original.data(), + expected, + unparsed); + } +}; + +bool skip_space(std::string_view& str) { + auto it = str.begin(); + while (it != str.end() && std::isspace(*it)) { + ++it; + } + const bool skipped = it != str.begin(); + str = std::string_view{it, str.end()}; + return skipped; +} + +bool skip_expected(std::string_view& str, const std::string_view& expected) { + if (!str.starts_with(expected)) { + return false; + } + str.remove_prefix(expected.length()); + return true; +} + +std::optional> +parse_identifier(const std::string_view& str) { + auto it = str.begin(); + while (it != str.end() && (*it == '_' || std::isalnum(*it))) { + ++it; + } + + if (it == str.begin()) { + return std::nullopt; + } + + return parse_result{ + .val = ss::sstring{str.begin(), it}, + .unparsed = std::string_view{it, str.end()}, + }; +} + +std::optional>> +parse_qualified_identifier(const std::string_view& str) { + auto unparsed = str; + + std::vector result; + while (true) { + if (!result.empty()) { + if (!skip_expected(unparsed, ".")) { + break; + } + } + + auto id = parse_identifier(unparsed); + if (!id) { + break; + } + result.push_back(id->val); + unparsed = id->unparsed; + } + + if (result.empty()) { + return std::nullopt; + } + + return parse_result>{ + .val = std::move(result), + .unparsed = unparsed, + }; +} + +struct transform_field { + std::vector source; + transform transform; +}; + +std::optional> +parse_transform_field(const std::string_view& str, parse_ctx& ctx) { + auto unparsed = str; + + auto transform_id = parse_identifier(unparsed); + if (!transform_id) { + return std::nullopt; + } + transform transform; + if (transform_id->val == "hour") { + transform = hour_transform{}; + } else if (transform_id->val == "day") { + transform = day_transform{}; + } else if (transform_id->val == "identity") { + transform = identity_transform{}; + } else { + ctx.report_expected(unparsed, "known transform name"); + return std::nullopt; + } + unparsed = transform_id->unparsed; + + skip_space(unparsed); + if (!skip_expected(unparsed, "(")) { + ctx.report_expected(unparsed, "'('"); + return std::nullopt; + } + + auto source = parse_qualified_identifier(unparsed); + if (!source) { + ctx.report_expected(unparsed, "qualified identifier"); + return std::nullopt; + } + unparsed = source->unparsed; + + skip_space(unparsed); + if (!skip_expected(unparsed, ")")) { + ctx.report_expected(unparsed, "')'"); + return std::nullopt; + } + + auto result = transform_field{ + .source = std::move(source->val), + .transform = transform, + }; + + return parse_result{ + .val = std::move(result), + .unparsed = unparsed, + }; +} + +std::optional> +parse_partition_field(const std::string_view& str, parse_ctx& ctx) { + auto unparsed = str; + skip_space(unparsed); + + transform_field tf; + if (auto parsed_tf = parse_transform_field(unparsed, ctx); parsed_tf) { + tf = std::move(parsed_tf->val); + unparsed = parsed_tf->unparsed; + } else if (auto parsed_sf = parse_qualified_identifier(unparsed); + parsed_sf) { + tf.source = std::move(parsed_sf->val); + tf.transform = identity_transform{}; + unparsed = parsed_sf->unparsed; + } else { + ctx.report_expected(unparsed, "qualified identifier or transform"); + return std::nullopt; + } + + ss::sstring source_field_str; + if ( + skip_space(unparsed) + && (skip_expected(unparsed, "AS") || skip_expected(unparsed, "as"))) { + if (!skip_space(unparsed)) { + ctx.report_expected(unparsed, "whitespace"); + return std::nullopt; + } + + auto id = parse_identifier(unparsed); + if (!id) { + ctx.report_expected(unparsed, "identifier"); + return std::nullopt; + } + source_field_str = std::move(id->val); + unparsed = id->unparsed; + } else { + // TODO: better + bool first = true; + for (const auto& id : tf.source) { + if (!first) { + source_field_str += "."; + } else { + first = false; + } + source_field_str += id; + } + + if (tf.transform != identity_transform{}) { + source_field_str += fmt::format("_{}", tf.transform); + } + } + + unresolved_partition_spec::field val{ + .source_name = std::move(tf.source), + .transform = tf.transform, + .name = std::move(source_field_str), + }; + + return parse_result{ + .val = std::move(val), + .unparsed = unparsed, + }; +} + +std::optional> +parse_partition_field_list(const std::string_view& str, parse_ctx& ctx) { + auto unparsed = str; + skip_space(unparsed); + + if (!skip_expected(unparsed, "(")) { + ctx.report_expected(unparsed, "'('"); + return std::nullopt; + } + + unresolved_partition_spec result; + while (true) { + if (!result.fields.empty()) { + skip_space(unparsed); + if (!skip_expected(unparsed, ",")) { + ctx.report_expected(unparsed, ","); + break; + } + } + + auto field = parse_partition_field(unparsed, ctx); + if (!field) { + break; + } + result.fields.push_back(field->val); + unparsed = field->unparsed; + } + + skip_space(unparsed); + if (!skip_expected(unparsed, ")")) { + ctx.report_expected(unparsed, "')'"); + return std::nullopt; + } + + return parse_result{ + .val = std::move(result), + .unparsed = unparsed, + }; +} + +} // namespace + +checked +unresolved_partition_spec::parse(const std::string_view& str) { + parse_ctx ctx{.original = str}; + auto res = parse_partition_field_list(str, ctx); + if (!res) { + return ctx.last_error; + } + skip_space(res->unparsed); + if (!res->unparsed.empty()) { + return fmt::format("unparsed: `{}'", res->unparsed); + } + return std::move(res->val); +} + +} // namespace iceberg diff --git a/src/v/iceberg/unresolved_partition_spec.h b/src/v/iceberg/unresolved_partition_spec.h new file mode 100644 index 0000000000000..cbf9cbeb32005 --- /dev/null +++ b/src/v/iceberg/unresolved_partition_spec.h @@ -0,0 +1,42 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "base/outcome.h" +#include "container/fragmented_vector.h" +#include "iceberg/transform.h" + +namespace iceberg { + +struct unresolved_partition_spec { + struct field { + // Components of the nested source field name, in increasing depth + // order. + std::vector source_name; + transform transform; + ss::sstring name; + + friend bool operator==(const field&, const field&) = default; + friend std::ostream& operator<<(std::ostream&, const field&); + }; + + chunked_vector fields; + + friend bool operator==( + const unresolved_partition_spec&, const unresolved_partition_spec&) + = default; + + friend std::ostream& + operator<<(std::ostream&, const unresolved_partition_spec&); + + static checked + parse(const std::string_view&); +}; + +} // namespace iceberg diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index aac6d12d02a62..998525241d1f1 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -62,6 +62,7 @@ v_cc_library( v::bytes v::rpc v::cluster + v::iceberg_unresolved_partition_spec v::kafka_partition_proxy v::kafka_protocol v::security diff --git a/src/v/kafka/server/BUILD b/src/v/kafka/server/BUILD index 8214764ca415d..3ec1e0f700893 100644 --- a/src/v/kafka/server/BUILD +++ b/src/v/kafka/server/BUILD @@ -176,6 +176,7 @@ redpanda_cc_library( "//src/v/features", "//src/v/hashing:jump_consistent", "//src/v/hashing:xx", + "//src/v/iceberg:unresolved_partition_spec", "//src/v/kafka/data:partition_proxy", "//src/v/kafka/protocol", "//src/v/kafka/protocol:add_offsets_to_txn", diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 3dddacada1401..8d4c6d089328e 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -83,7 +83,7 @@ create_topic_properties_update( std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 32, + std::tuple_size_v == 33, "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -370,6 +370,13 @@ create_topic_properties_update( kafka::config_resource_operation::set); continue; } + if (cfg.name == topic_property_iceberg_partition_spec) { + parse_and_set_optional( + update.properties.iceberg_partition_spec, + cfg.value, + kafka::config_resource_operation::set); + continue; + } } catch (const validation_error& e) { return make_error_alter_config_resource_response< diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index dadf54560768e..75cea61d922e8 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -992,6 +992,21 @@ config_response_container_t make_topic_configs( "topic."), &describe_as_string); + if (topic_properties.iceberg_mode != model::iceberg_mode::disabled) { + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().iceberg_default_partition_spec.name(), + config::shard_local_cfg().iceberg_default_partition_spec(), + topic_property_iceberg_partition_spec, + topic_properties.iceberg_partition_spec, + include_synonyms, + maybe_make_documentation( + include_documentation, + "Partition spec of the corresponding Iceberg table."), + &describe_as_string); + } + return result; } diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 56db192b3ad2c..d27dd9d6a9f30 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -76,7 +76,8 @@ bool is_supported(std::string_view name) { topic_property_iceberg_mode, topic_property_leaders_preference, topic_property_delete_retention_ms, - topic_property_iceberg_delete}); + topic_property_iceberg_delete, + topic_property_iceberg_partition_spec}); if (std::any_of( supported_configs.begin(), diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 30f866fe71e9a..1c2c649792600 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -374,6 +374,11 @@ create_topic_properties_update( update.properties.iceberg_delete, cfg.value, op); continue; } + if (cfg.name == topic_property_iceberg_partition_spec) { + parse_and_set_optional( + update.properties.iceberg_partition_spec, cfg.value, op); + continue; + } } catch (const validation_error& e) { vlog( diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 3e1d42941d011..dc5d1d3dac2a9 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -272,6 +272,9 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.iceberg_delete = get_bool_value( config_entries, topic_property_iceberg_delete); + cfg.properties.iceberg_partition_spec = get_string_value( + config_entries, topic_property_iceberg_partition_spec); + schema_id_validation_config_parser schema_id_validation_config_parser{ cfg.properties}; diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index ab3304353791c..4223896b046c0 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -108,6 +108,9 @@ inline constexpr std::string_view topic_property_cloud_topic_enabled inline constexpr std::string_view topic_property_iceberg_delete = "redpanda.iceberg.delete"; +inline constexpr std::string_view topic_property_iceberg_partition_spec + = "redpanda.iceberg.partition.spec"; + // Kafka topic properties that is not relevant for Redpanda // Or cannot be altered with kafka alter handler inline constexpr std::array allowlist_topic_noop_confs = { diff --git a/src/v/kafka/server/handlers/topics/validators.h b/src/v/kafka/server/handlers/topics/validators.h index 79b26d4a14bb5..21162bba2f604 100644 --- a/src/v/kafka/server/handlers/topics/validators.h +++ b/src/v/kafka/server/handlers/topics/validators.h @@ -11,6 +11,7 @@ #pragma once #include "config/configuration.h" +#include "iceberg/unresolved_partition_spec.h" #include "kafka/protocol/schemata/create_topics_request.h" #include "kafka/protocol/schemata/create_topics_response.h" #include "kafka/server/handlers/topics/types.h" @@ -276,22 +277,37 @@ struct iceberg_config_validator { static constexpr error_code ec = error_code::invalid_config; static bool is_valid(const creatable_topic& c) { - auto it = std::find_if( + model::iceberg_mode parsed_mode = model::iceberg_mode::disabled; + + auto mode_it = std::find_if( c.configs.begin(), c.configs.end(), [](const createable_topic_config& cfg) { return cfg.name == topic_property_iceberg_mode; }); - if (it == c.configs.end() || !it->value.has_value()) { - return true; + if (mode_it != c.configs.end() && mode_it->value.has_value()) { + try { + parsed_mode = boost::lexical_cast( + mode_it->value.value()); + } catch (...) { + return false; + } } - model::iceberg_mode parsed_mode; - try { - parsed_mode = boost::lexical_cast( - it->value.value()); - } catch (...) { - return false; + + auto pspec_it = std::find_if( + c.configs.begin(), + c.configs.end(), + [](const createable_topic_config& cfg) { + return cfg.name == topic_property_iceberg_partition_spec; + }); + if (pspec_it != c.configs.end() && pspec_it->value.has_value()) { + auto parsed = iceberg::unresolved_partition_spec::parse( + pspec_it->value.value()); + if (!parsed.has_value()) { + return false; + } } + // If iceberg is enabled at the cluster level, the topic can // be created with any override. If it is disabled // at the cluster level, it cannot be enabled with a topic diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index 2ce2b80f2d7e1..33083dc9b3665 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -301,6 +301,12 @@ def test_describe_topics_with_documentation_and_types(self): doc_string= "If true, delete the corresponding Iceberg table when deleting the topic." ), + "redpanda.iceberg.partition.spec": + ConfigProperty( + config_type="STRING", + value="", + doc_string="Partition spec of the corresponding Iceberg table." + ), } tp_spec = TopicSpec() diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 687f6832f83ab..c2bb77e2a71bf 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -144,6 +144,10 @@ def read_topic_properties_serde(rdr: Reader, version): 'delete_retention_ms': rdr.read_tristate(Reader.read_int64), 'iceberg_delete': rdr.read_optional(Reader.read_bool), } + if version >= 11: + topic_properties |= { + 'iceberg_partition_spec': rdr.read_optional(Reader.read_string), + } return topic_properties @@ -308,6 +312,11 @@ def incr_topic_upd(rdr: Reader, version): rdr.read_optional(read_leaders_preference), 'iceberg_delete': rdr.read_optional(Reader.read_bool), } + if version >= 8: + incr_obj |= { + 'iceberg_partition_spec': + rdr.read_optional(Reader.read_string), + } return incr_obj