From 9f1de09e112d019fa8d10c3093f69798a219047b Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 16 Jan 2025 14:31:21 +0100 Subject: [PATCH 01/15] config: introduce iceberg_default_partition spec cluster config property --- src/v/config/configuration.cc | 9 +++++++++ src/v/config/configuration.h | 1 + 2 files changed, 10 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index b9a1ab180915..f4c9bd720274 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3793,6 +3793,15 @@ configuration::configuration() "the topic.", {.needs_restart = needs_restart::no, .visibility = visibility::user}, true) + , iceberg_default_partition_spec( + *this, + "iceberg_default_partition_spec", + "Default value for the redpanda.iceberg.partition.spec topic property " + "that determines the partition spec for the Iceberg table corresponding " + "to the topic.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + "(hour(redpanda.timestamp))", + &validate_non_empty_string_opt) , development_enable_cloud_topics( *this, "development_enable_cloud_topics", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index cea4325929f8..241a0c8f5ccb 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -719,6 +719,7 @@ struct configuration final : public config_store { property> iceberg_rest_catalog_prefix; property iceberg_delete; + property iceberg_default_partition_spec; configuration(); From 8626a358be186b2682b82d8d2e1703d7b026a917 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 16 Jan 2025 15:02:14 +0100 Subject: [PATCH 02/15] cluster: add iceberg_partition_spec topic property --- src/v/cloud_storage/tests/topic_manifest_test.cc | 1 + src/v/cluster/topic_properties.cc | 9 ++++++--- src/v/cluster/topic_properties.h | 12 ++++++++---- src/v/compat/cluster_generator.h | 1 + tools/offline_log_viewer/controller.py | 4 ++++ 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/v/cloud_storage/tests/topic_manifest_test.cc b/src/v/cloud_storage/tests/topic_manifest_test.cc index 515a1c6a1afb..746b08fbc177 100644 --- a/src/v/cloud_storage/tests/topic_manifest_test.cc +++ b/src/v/cloud_storage/tests/topic_manifest_test.cc @@ -484,6 +484,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) { false, tristate{}, std::nullopt, + std::nullopt, }; auto random_initial_revision_id diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index c6f7e6760753..ea7ede146151 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -42,7 +42,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "remote_label: {}, iceberg_mode: {}, " "leaders_preference: {}, " "delete_retention_ms: {}, " - "iceberg_delete: {}", + "iceberg_delete: {}, ", + "iceberg_partition_spec: {}", properties.compression, properties.cleanup_policy_bitflags, properties.compaction_strategy, @@ -79,7 +80,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.iceberg_mode, properties.leaders_preference, properties.delete_retention_ms, - properties.iceberg_delete); + properties.iceberg_delete, + properties.iceberg_partition_spec); if (config::shard_local_cfg().development_enable_cloud_topics()) { fmt::print( @@ -125,7 +127,7 @@ bool topic_properties::has_overrides() const { || flush_bytes.has_value() || remote_label.has_value() || (iceberg_mode != storage::ntp_config::default_iceberg_mode) || leaders_preference.has_value() || delete_retention_ms.is_engaged() - || iceberg_delete.has_value(); + || iceberg_delete.has_value() || iceberg_partition_spec.has_value(); if (config::shard_local_cfg().development_enable_cloud_topics()) { return overrides @@ -261,6 +263,7 @@ adl::from(iobuf_parser& parser) { false, tristate{disable_tristate}, std::nullopt, + std::nullopt, }; } diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index 93797c69ad49..a3ab7e739da4 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -33,7 +33,7 @@ namespace cluster { */ struct topic_properties : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_properties() noexcept = default; topic_properties( std::optional compression, @@ -76,7 +76,8 @@ struct topic_properties std::optional leaders_preference, bool cloud_topic_enabled, tristate delete_retention_ms, - std::optional iceberg_delete) + std::optional iceberg_delete, + std::optional iceberg_partition_spec) : compression(compression) , cleanup_policy_bitflags(cleanup_policy_bitflags) , compaction_strategy(compaction_strategy) @@ -118,7 +119,8 @@ struct topic_properties , leaders_preference(std::move(leaders_preference)) , cloud_topic_enabled(cloud_topic_enabled) , delete_retention_ms(delete_retention_ms) - , iceberg_delete(iceberg_delete) {} + , iceberg_delete(iceberg_delete) + , iceberg_partition_spec(std::move(iceberg_partition_spec)) {} std::optional compression; std::optional cleanup_policy_bitflags; @@ -194,6 +196,7 @@ struct topic_properties tristate delete_retention_ms{disable_tristate}; // Should we delete the corresponding iceberg table when deleting the topic. std::optional iceberg_delete; + std::optional iceberg_partition_spec; bool is_compacted() const; bool has_overrides() const; @@ -241,7 +244,8 @@ struct topic_properties leaders_preference, cloud_topic_enabled, delete_retention_ms, - iceberg_delete); + iceberg_delete, + iceberg_partition_spec); } friend bool operator==(const topic_properties&, const topic_properties&) diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index 08b598bf489e..018296e45def 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -655,6 +655,7 @@ struct instance_generator { std::nullopt, false, tristate{disable_tristate}, + std::nullopt, std::nullopt}; } diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 687f6832f83a..79da46f94989 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -144,6 +144,10 @@ def read_topic_properties_serde(rdr: Reader, version): 'delete_retention_ms': rdr.read_tristate(Reader.read_int64), 'iceberg_delete': rdr.read_optional(Reader.read_bool), } + if version >= 11: + topic_properties |= { + 'iceberg_partition_spec': rdr.read_optional(Reader.read_string), + } return topic_properties From 00f0e4f3bcb205fd3a1a66ed621e1daf196ddc3f Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 16 Jan 2025 15:13:48 +0100 Subject: [PATCH 03/15] k/create_topics: added handling for iceberg_partition_spec property --- src/v/kafka/server/handlers/create_topics.cc | 3 ++- src/v/kafka/server/handlers/topics/types.cc | 3 +++ src/v/kafka/server/handlers/topics/types.h | 3 +++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 56db192b3ad2..d27dd9d6a9f3 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -76,7 +76,8 @@ bool is_supported(std::string_view name) { topic_property_iceberg_mode, topic_property_leaders_preference, topic_property_delete_retention_ms, - topic_property_iceberg_delete}); + topic_property_iceberg_delete, + topic_property_iceberg_partition_spec}); if (std::any_of( supported_configs.begin(), diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 3e1d42941d01..dc5d1d3dac2a 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -272,6 +272,9 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.iceberg_delete = get_bool_value( config_entries, topic_property_iceberg_delete); + cfg.properties.iceberg_partition_spec = get_string_value( + config_entries, topic_property_iceberg_partition_spec); + schema_id_validation_config_parser schema_id_validation_config_parser{ cfg.properties}; diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index ab3304353791..4223896b046c 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -108,6 +108,9 @@ inline constexpr std::string_view topic_property_cloud_topic_enabled inline constexpr std::string_view topic_property_iceberg_delete = "redpanda.iceberg.delete"; +inline constexpr std::string_view topic_property_iceberg_partition_spec + = "redpanda.iceberg.partition.spec"; + // Kafka topic properties that is not relevant for Redpanda // Or cannot be altered with kafka alter handler inline constexpr std::array allowlist_topic_noop_confs = { From d5dc7f1f0b83b29699c817fd4b770e205709572f Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 16 Jan 2025 15:24:46 +0100 Subject: [PATCH 04/15] k/describe_configs: add handling for iceberg_partition_spec topic config --- .../handlers/configs/config_response_utils.cc | 15 +++++++++++++++ tests/rptest/tests/describe_topics_test.py | 6 ++++++ 2 files changed, 21 insertions(+) diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index dadf54560768..75cea61d922e 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -992,6 +992,21 @@ config_response_container_t make_topic_configs( "topic."), &describe_as_string); + if (topic_properties.iceberg_mode != model::iceberg_mode::disabled) { + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().iceberg_default_partition_spec.name(), + config::shard_local_cfg().iceberg_default_partition_spec(), + topic_property_iceberg_partition_spec, + topic_properties.iceberg_partition_spec, + include_synonyms, + maybe_make_documentation( + include_documentation, + "Partition spec of the corresponding Iceberg table."), + &describe_as_string); + } + return result; } diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index 2ce2b80f2d7e..33083dc9b366 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -301,6 +301,12 @@ def test_describe_topics_with_documentation_and_types(self): doc_string= "If true, delete the corresponding Iceberg table when deleting the topic." ), + "redpanda.iceberg.partition.spec": + ConfigProperty( + config_type="STRING", + value="", + doc_string="Partition spec of the corresponding Iceberg table." + ), } tp_spec = TopicSpec() From ddf0f427bfd4eeffdd28cb79661e305ce1513f18 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 16 Jan 2025 15:39:01 +0100 Subject: [PATCH 05/15] k/alter_configs: add handling for iceberg_partition_spec topic property --- src/v/cluster/topic_table.cc | 3 +++ src/v/cluster/types.cc | 6 ++++-- src/v/cluster/types.h | 6 ++++-- src/v/kafka/server/handlers/alter_configs.cc | 9 ++++++++- src/v/kafka/server/handlers/incremental_alter_configs.cc | 5 +++++ tools/offline_log_viewer/controller.py | 5 +++++ 6 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 462bfba208f6..e06a44055ec4 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -1101,6 +1101,9 @@ topic_properties topic_table::update_topic_properties( updated_properties.delete_retention_ms, overrides.delete_retention_ms); incremental_update( updated_properties.iceberg_delete, overrides.iceberg_delete); + incremental_update( + updated_properties.iceberg_partition_spec, + overrides.iceberg_partition_spec); return updated_properties; } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index f99c0df16126..49d6595c0135 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -386,7 +386,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { "initial_retention_local_target_bytes: {}, " "initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, " "flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, " - "remote_read: {}, remote_write: {}, iceberg_delete: {}", + "remote_read: {}, remote_write: {}, iceberg_delete: {}, " + "iceberg_partition_spec: {}", i.compression, i.cleanup_policy_bitflags, i.compaction_strategy, @@ -417,7 +418,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.leaders_preference, i.remote_read, i.remote_write, - i.iceberg_delete); + i.iceberg_delete, + i.iceberg_partition_spec); return o; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index ef851afedc35..9684aac6889c 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -568,7 +568,7 @@ struct property_update> struct incremental_topic_updates : serde::envelope< incremental_topic_updates, - serde::version<7>, + serde::version<8>, serde::compat_version<0>> { static constexpr int8_t version_with_data_policy = -1; static constexpr int8_t version_with_shadow_indexing = -3; @@ -641,6 +641,7 @@ struct incremental_topic_updates leaders_preference; property_update> delete_retention_ms; property_update> iceberg_delete; + property_update> iceberg_partition_spec; // To allow us to better control use of the deprecated shadow_indexing // field, use getters and setters instead. @@ -680,7 +681,8 @@ struct incremental_topic_updates remote_read, remote_write, delete_retention_ms, - iceberg_delete); + iceberg_delete, + iceberg_partition_spec); } friend std::ostream& diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 3dddacada140..8d4c6d089328 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -83,7 +83,7 @@ create_topic_properties_update( std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 32, + std::tuple_size_v == 33, "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -370,6 +370,13 @@ create_topic_properties_update( kafka::config_resource_operation::set); continue; } + if (cfg.name == topic_property_iceberg_partition_spec) { + parse_and_set_optional( + update.properties.iceberg_partition_spec, + cfg.value, + kafka::config_resource_operation::set); + continue; + } } catch (const validation_error& e) { return make_error_alter_config_resource_response< diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 30f866fe71e9..1c2c64979260 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -374,6 +374,11 @@ create_topic_properties_update( update.properties.iceberg_delete, cfg.value, op); continue; } + if (cfg.name == topic_property_iceberg_partition_spec) { + parse_and_set_optional( + update.properties.iceberg_partition_spec, cfg.value, op); + continue; + } } catch (const validation_error& e) { vlog( diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 79da46f94989..c2bb77e2a71b 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -312,6 +312,11 @@ def incr_topic_upd(rdr: Reader, version): rdr.read_optional(read_leaders_preference), 'iceberg_delete': rdr.read_optional(Reader.read_bool), } + if version >= 8: + incr_obj |= { + 'iceberg_partition_spec': + rdr.read_optional(Reader.read_string), + } return incr_obj From 3e900652b25dd4bc9733921df00b3d3a8922043f Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 20 Jan 2025 20:14:47 +0100 Subject: [PATCH 06/15] iceberg: split unresolved_partition_spec into its own lib This is needed to be able to use this lib (that doesn't depend on most of the iceberg proper) for config validation. --- src/v/iceberg/BUILD | 18 ++++++++++++ src/v/iceberg/CMakeLists.txt | 13 ++++++++- src/v/iceberg/partition.cc | 16 ----------- src/v/iceberg/partition.h | 18 +----------- src/v/iceberg/unresolved_partition_spec.cc | 30 ++++++++++++++++++++ src/v/iceberg/unresolved_partition_spec.h | 33 ++++++++++++++++++++++ 6 files changed, 94 insertions(+), 34 deletions(-) create mode 100644 src/v/iceberg/unresolved_partition_spec.cc create mode 100644 src/v/iceberg/unresolved_partition_spec.h diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index c782573dd31e..040a01c9e49e 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -513,6 +513,7 @@ redpanda_cc_library( deps = [ ":datatypes", ":transform", + ":unresolved_partition_spec", "//src/v/container:fragmented_vector", "//src/v/utils:named_type", "@seastar", @@ -1036,6 +1037,23 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "unresolved_partition_spec", + srcs = [ + "unresolved_partition_spec.cc", + ], + hdrs = [ + "unresolved_partition_spec.h", + ], + include_prefix = "iceberg", + visibility = ["//visibility:public"], + deps = [ + ":transform", + "//src/v/container:fragmented_vector", + "@seastar", + ], +) + redpanda_cc_library( name = "update_schema_action", srcs = [ diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 957b0da1c929..6fa6df695d18 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -24,6 +24,17 @@ foreach (schema ${schemas}) list(APPEND avro_hdrs ${avro_hdr}) endforeach() +v_cc_library( + NAME iceberg_unresolved_partition_spec + SRCS + transform.cc + unresolved_partition_spec.cc + DEPS + Seastar::seastar + v::base + v::container +) + v_cc_library( NAME iceberg SRCS @@ -67,7 +78,6 @@ v_cc_library( time_transform_visitor.cc table_identifier.cc transaction.cc - transform.cc transform_json.cc transform_utils.cc update_schema_action.cc @@ -86,6 +96,7 @@ v_cc_library( v::bytes v::cloud_io v::container + v::iceberg_unresolved_partition_spec v::json v::strings v::utils diff --git a/src/v/iceberg/partition.cc b/src/v/iceberg/partition.cc index 2a6ca7103223..1b7ba1a84e99 100644 --- a/src/v/iceberg/partition.cc +++ b/src/v/iceberg/partition.cc @@ -11,22 +11,6 @@ namespace iceberg { -std::ostream& -operator<<(std::ostream& o, const unresolved_partition_spec::field& f) { - fmt::print( - o, - "{{source_name: {}, transform: {}, name: {}}}", - f.source_name, - f.transform, - f.name); - return o; -} - -std::ostream& operator<<(std::ostream& o, const unresolved_partition_spec& ps) { - fmt::print(o, "{{fields: {}}}", ps.fields); - return o; -} - std::ostream& operator<<(std::ostream& o, const partition_field& f) { fmt::print( o, diff --git a/src/v/iceberg/partition.h b/src/v/iceberg/partition.h index cf1ccad58f0e..9a814ad315f8 100644 --- a/src/v/iceberg/partition.h +++ b/src/v/iceberg/partition.h @@ -11,29 +11,13 @@ #include "container/fragmented_vector.h" #include "iceberg/datatypes.h" #include "iceberg/transform.h" +#include "iceberg/unresolved_partition_spec.h" #include "utils/named_type.h" #include namespace iceberg { -struct unresolved_partition_spec { - struct field { - // Components of the nested source field name, in increasing depth - // order. - std::vector source_name; - transform transform; - ss::sstring name; - - friend std::ostream& operator<<(std::ostream&, const field&); - }; - - chunked_vector fields; - - friend std::ostream& - operator<<(std::ostream&, const unresolved_partition_spec&); -}; - struct partition_field { using id_t = named_type; nested_field::id_t source_id; diff --git a/src/v/iceberg/unresolved_partition_spec.cc b/src/v/iceberg/unresolved_partition_spec.cc new file mode 100644 index 000000000000..1e0f072560dd --- /dev/null +++ b/src/v/iceberg/unresolved_partition_spec.cc @@ -0,0 +1,30 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/unresolved_partition_spec.h" + +namespace iceberg { + +std::ostream& +operator<<(std::ostream& o, const unresolved_partition_spec::field& f) { + fmt::print( + o, + "{{source_name: {}, transform: {}, name: {}}}", + f.source_name, + f.transform, + f.name); + return o; +} + +std::ostream& operator<<(std::ostream& o, const unresolved_partition_spec& ps) { + fmt::print(o, "{{fields: {}}}", ps.fields); + return o; +} + +} // namespace iceberg diff --git a/src/v/iceberg/unresolved_partition_spec.h b/src/v/iceberg/unresolved_partition_spec.h new file mode 100644 index 000000000000..11d543565961 --- /dev/null +++ b/src/v/iceberg/unresolved_partition_spec.h @@ -0,0 +1,33 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "container/fragmented_vector.h" +#include "iceberg/transform.h" + +namespace iceberg { + +struct unresolved_partition_spec { + struct field { + // Components of the nested source field name, in increasing depth + // order. + std::vector source_name; + transform transform; + ss::sstring name; + + friend std::ostream& operator<<(std::ostream&, const field&); + }; + + chunked_vector fields; + + friend std::ostream& + operator<<(std::ostream&, const unresolved_partition_spec&); +}; + +} // namespace iceberg From 52f7b0ad463e58784b3bec5ab84be49567d453ec Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 16 Jan 2025 20:40:10 +0100 Subject: [PATCH 07/15] iceberg: simple parser for unresolved_partition_spec --- src/v/iceberg/tests/partition_test.cc | 59 +++++ src/v/iceberg/unresolved_partition_spec.cc | 238 +++++++++++++++++++++ src/v/iceberg/unresolved_partition_spec.h | 8 + 3 files changed, 305 insertions(+) diff --git a/src/v/iceberg/tests/partition_test.cc b/src/v/iceberg/tests/partition_test.cc index 51e31ef7c5c5..6f42245d2bd3 100644 --- a/src/v/iceberg/tests/partition_test.cc +++ b/src/v/iceberg/tests/partition_test.cc @@ -234,3 +234,62 @@ TEST(PartitionTest, TestSpecResolve) { ASSERT_EQ(resolved, std::nullopt); } } + +TEST(PartitionTest, TestSpecParse) { + { + auto res = unresolved_partition_spec::parse("()"); + ASSERT_TRUE(res); + ASSERT_EQ(res.value(), unresolved_partition_spec{}); + } + + { + auto res = unresolved_partition_spec::parse("(foo)"); + ASSERT_TRUE(res); + auto expected = chunked_vector{ + unresolved_partition_spec::field{ + .source_name = {"foo"}, + .transform = identity_transform{}, + .name = "foo"}, + }; + ASSERT_EQ( + res.value(), + unresolved_partition_spec{.fields = std::move(expected)}); + } + + { + auto res = unresolved_partition_spec::parse(" (foo.bar, baz ) "); + ASSERT_TRUE(res); + auto expected = chunked_vector{ + unresolved_partition_spec::field{ + .source_name = {"foo", "bar"}, + .transform = identity_transform{}, + .name = "foo.bar"}, + unresolved_partition_spec::field{ + .source_name = {"baz"}, + .transform = identity_transform{}, + .name = "baz"}, + }; + ASSERT_EQ( + res.value(), + unresolved_partition_spec{.fields = std::move(expected)}); + } + + { + auto res = unresolved_partition_spec::parse( + " (hour(redpanda.timestamp), day(my_ts) as my_day )"); + ASSERT_TRUE(res); + auto expected = chunked_vector{ + unresolved_partition_spec::field{ + .source_name = {"redpanda", "timestamp"}, + .transform = hour_transform{}, + .name = "redpanda.timestamp_hour"}, + unresolved_partition_spec::field{ + .source_name = {"my_ts"}, + .transform = day_transform{}, + .name = "my_day"}, + }; + ASSERT_EQ( + res.value(), + unresolved_partition_spec{.fields = std::move(expected)}); + } +} diff --git a/src/v/iceberg/unresolved_partition_spec.cc b/src/v/iceberg/unresolved_partition_spec.cc index 1e0f072560dd..373f5e6993e2 100644 --- a/src/v/iceberg/unresolved_partition_spec.cc +++ b/src/v/iceberg/unresolved_partition_spec.cc @@ -27,4 +27,242 @@ std::ostream& operator<<(std::ostream& o, const unresolved_partition_spec& ps) { return o; } +namespace { + +template +struct parse_result { + T val; + std::string_view unparsed; +}; + +bool skip_space(std::string_view& str) { + auto it = str.begin(); + while (it != str.end() && std::isspace(*it)) { + ++it; + } + const bool skipped = it != str.begin(); + str = std::string_view{it, str.end()}; + return skipped; +} + +bool skip_expected(std::string_view& str, const std::string_view& expected) { + if (!str.starts_with(expected)) { + return false; + } + str.remove_prefix(expected.length()); + return true; +} + +std::optional> +parse_identifier(const std::string_view& str) { + auto it = str.begin(); + while (it != str.end() && (*it == '_' || std::isalnum(*it))) { + ++it; + } + + if (it == str.begin()) { + return std::nullopt; + } + + return parse_result{ + .val = ss::sstring{str.begin(), it}, + .unparsed = std::string_view{it, str.end()}, + }; +} + +std::optional>> +parse_qualified_identifier(const std::string_view& str) { + auto unparsed = str; + + std::vector result; + while (true) { + if (!result.empty()) { + if (!skip_expected(unparsed, ".")) { + break; + } + } + + auto id = parse_identifier(unparsed); + if (!id) { + break; + } + result.push_back(id->val); + unparsed = id->unparsed; + } + + if (result.empty()) { + return std::nullopt; + } + + return parse_result>{ + .val = std::move(result), + .unparsed = unparsed, + }; +} + +struct transform_field { + std::vector source; + transform transform; +}; + +std::optional> +parse_transform_field(const std::string_view& str) { + auto unparsed = str; + + auto transform_id = parse_identifier(unparsed); + if (!transform_id) { + return std::nullopt; + } + transform transform; + if (transform_id->val == "hour") { + transform = hour_transform{}; + } else if (transform_id->val == "day") { + transform = day_transform{}; + } else if (transform_id->val == "identity") { + transform = identity_transform{}; + } else { + return std::nullopt; + } + unparsed = transform_id->unparsed; + + skip_space(unparsed); + if (!skip_expected(unparsed, "(")) { + return std::nullopt; + } + + auto source = parse_qualified_identifier(unparsed); + if (!source) { + return std::nullopt; + } + unparsed = source->unparsed; + + skip_space(unparsed); + if (!skip_expected(unparsed, ")")) { + return std::nullopt; + } + + auto result = transform_field{ + .source = std::move(source->val), + .transform = transform, + }; + + return parse_result{ + .val = std::move(result), + .unparsed = unparsed, + }; +} + +std::optional> +parse_partition_field(const std::string_view& str) { + auto unparsed = str; + skip_space(unparsed); + + transform_field tf; + if (auto parsed_tf = parse_transform_field(unparsed); parsed_tf) { + tf = std::move(parsed_tf->val); + unparsed = parsed_tf->unparsed; + } else if (auto parsed_sf = parse_qualified_identifier(unparsed); + parsed_sf) { + tf.source = std::move(parsed_sf->val); + tf.transform = identity_transform{}; + unparsed = parsed_sf->unparsed; + } else { + return std::nullopt; + } + + ss::sstring source_field_str; + if ( + skip_space(unparsed) + && (skip_expected(unparsed, "AS") || skip_expected(unparsed, "as"))) { + if (!skip_space(unparsed)) { + return std::nullopt; + } + + auto id = parse_identifier(unparsed); + if (!id) { + return std::nullopt; + } + source_field_str = std::move(id->val); + unparsed = id->unparsed; + } else { + // TODO: better + bool first = true; + for (const auto& id : tf.source) { + if (!first) { + source_field_str += "."; + } else { + first = false; + } + source_field_str += id; + } + + if (tf.transform != identity_transform{}) { + source_field_str += fmt::format("_{}", tf.transform); + } + } + + unresolved_partition_spec::field val{ + .source_name = std::move(tf.source), + .transform = tf.transform, + .name = std::move(source_field_str), + }; + + return parse_result{ + .val = std::move(val), + .unparsed = unparsed, + }; +} + +std::optional> +parse_partition_field_list(const std::string_view& str) { + auto unparsed = str; + skip_space(unparsed); + + if (!skip_expected(unparsed, "(")) { + return std::nullopt; + } + + unresolved_partition_spec result; + while (true) { + if (!result.fields.empty()) { + skip_space(unparsed); + if (!skip_expected(unparsed, ",")) { + break; + } + } + + auto field = parse_partition_field(unparsed); + if (!field) { + break; + } + result.fields.push_back(field->val); + unparsed = field->unparsed; + } + + skip_space(unparsed); + if (!skip_expected(unparsed, ")")) { + return std::nullopt; + } + + return parse_result{ + .val = std::move(result), + .unparsed = unparsed, + }; +} + +} // namespace + +std::optional +unresolved_partition_spec::parse(const std::string_view& str) { + auto res = parse_partition_field_list(str); + if (!res) { + return std::nullopt; + } + skip_space(res->unparsed); + if (!res->unparsed.empty()) { + return std::nullopt; + } + return std::move(res->val); +} + } // namespace iceberg diff --git a/src/v/iceberg/unresolved_partition_spec.h b/src/v/iceberg/unresolved_partition_spec.h index 11d543565961..b100f75de7cd 100644 --- a/src/v/iceberg/unresolved_partition_spec.h +++ b/src/v/iceberg/unresolved_partition_spec.h @@ -21,13 +21,21 @@ struct unresolved_partition_spec { transform transform; ss::sstring name; + friend bool operator==(const field&, const field&) = default; friend std::ostream& operator<<(std::ostream&, const field&); }; chunked_vector fields; + friend bool operator==( + const unresolved_partition_spec&, const unresolved_partition_spec&) + = default; + friend std::ostream& operator<<(std::ostream&, const unresolved_partition_spec&); + + static std::optional + parse(const std::string_view&); }; } // namespace iceberg From 6dd4b89560c13d265256ae15d4aefebbfd00651c Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 17 Jan 2025 13:00:04 +0100 Subject: [PATCH 08/15] iceberg: simple error reporting for partition spec parsing --- src/v/iceberg/BUILD | 1 + src/v/iceberg/tests/partition_test.cc | 8 ++--- src/v/iceberg/unresolved_partition_spec.cc | 42 +++++++++++++++++----- src/v/iceberg/unresolved_partition_spec.h | 3 +- 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index 040a01c9e49e..cef541dfb059 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -1049,6 +1049,7 @@ redpanda_cc_library( visibility = ["//visibility:public"], deps = [ ":transform", + "//src/v/base", "//src/v/container:fragmented_vector", "@seastar", ], diff --git a/src/v/iceberg/tests/partition_test.cc b/src/v/iceberg/tests/partition_test.cc index 6f42245d2bd3..cadba38520ac 100644 --- a/src/v/iceberg/tests/partition_test.cc +++ b/src/v/iceberg/tests/partition_test.cc @@ -238,13 +238,13 @@ TEST(PartitionTest, TestSpecResolve) { TEST(PartitionTest, TestSpecParse) { { auto res = unresolved_partition_spec::parse("()"); - ASSERT_TRUE(res); + ASSERT_FALSE(res.has_error()) << res.error(); ASSERT_EQ(res.value(), unresolved_partition_spec{}); } { auto res = unresolved_partition_spec::parse("(foo)"); - ASSERT_TRUE(res); + ASSERT_FALSE(res.has_error()) << res.error(); auto expected = chunked_vector{ unresolved_partition_spec::field{ .source_name = {"foo"}, @@ -258,7 +258,7 @@ TEST(PartitionTest, TestSpecParse) { { auto res = unresolved_partition_spec::parse(" (foo.bar, baz ) "); - ASSERT_TRUE(res); + ASSERT_FALSE(res.has_error()) << res.error(); auto expected = chunked_vector{ unresolved_partition_spec::field{ .source_name = {"foo", "bar"}, @@ -277,7 +277,7 @@ TEST(PartitionTest, TestSpecParse) { { auto res = unresolved_partition_spec::parse( " (hour(redpanda.timestamp), day(my_ts) as my_day )"); - ASSERT_TRUE(res); + ASSERT_FALSE(res.has_error()) << res.error(); auto expected = chunked_vector{ unresolved_partition_spec::field{ .source_name = {"redpanda", "timestamp"}, diff --git a/src/v/iceberg/unresolved_partition_spec.cc b/src/v/iceberg/unresolved_partition_spec.cc index 373f5e6993e2..98eefceefcc8 100644 --- a/src/v/iceberg/unresolved_partition_spec.cc +++ b/src/v/iceberg/unresolved_partition_spec.cc @@ -35,6 +35,19 @@ struct parse_result { std::string_view unparsed; }; +struct parse_ctx { + std::string_view original; + ss::sstring last_error; + + void report_expected(std::string_view unparsed, std::string_view expected) { + last_error = fmt::format( + "col {}: expected {} (got instead: `{}')", + unparsed.data() - original.data(), + expected, + unparsed); + } +}; + bool skip_space(std::string_view& str) { auto it = str.begin(); while (it != str.end() && std::isspace(*it)) { @@ -106,7 +119,7 @@ struct transform_field { }; std::optional> -parse_transform_field(const std::string_view& str) { +parse_transform_field(const std::string_view& str, parse_ctx& ctx) { auto unparsed = str; auto transform_id = parse_identifier(unparsed); @@ -121,23 +134,27 @@ parse_transform_field(const std::string_view& str) { } else if (transform_id->val == "identity") { transform = identity_transform{}; } else { + ctx.report_expected(unparsed, "known transform name"); return std::nullopt; } unparsed = transform_id->unparsed; skip_space(unparsed); if (!skip_expected(unparsed, "(")) { + ctx.report_expected(unparsed, "'('"); return std::nullopt; } auto source = parse_qualified_identifier(unparsed); if (!source) { + ctx.report_expected(unparsed, "qualified identifier"); return std::nullopt; } unparsed = source->unparsed; skip_space(unparsed); if (!skip_expected(unparsed, ")")) { + ctx.report_expected(unparsed, "')'"); return std::nullopt; } @@ -153,12 +170,12 @@ parse_transform_field(const std::string_view& str) { } std::optional> -parse_partition_field(const std::string_view& str) { +parse_partition_field(const std::string_view& str, parse_ctx& ctx) { auto unparsed = str; skip_space(unparsed); transform_field tf; - if (auto parsed_tf = parse_transform_field(unparsed); parsed_tf) { + if (auto parsed_tf = parse_transform_field(unparsed, ctx); parsed_tf) { tf = std::move(parsed_tf->val); unparsed = parsed_tf->unparsed; } else if (auto parsed_sf = parse_qualified_identifier(unparsed); @@ -167,6 +184,7 @@ parse_partition_field(const std::string_view& str) { tf.transform = identity_transform{}; unparsed = parsed_sf->unparsed; } else { + ctx.report_expected(unparsed, "qualified identifier or transform"); return std::nullopt; } @@ -175,11 +193,13 @@ parse_partition_field(const std::string_view& str) { skip_space(unparsed) && (skip_expected(unparsed, "AS") || skip_expected(unparsed, "as"))) { if (!skip_space(unparsed)) { + ctx.report_expected(unparsed, "whitespace"); return std::nullopt; } auto id = parse_identifier(unparsed); if (!id) { + ctx.report_expected(unparsed, "identifier"); return std::nullopt; } source_field_str = std::move(id->val); @@ -214,11 +234,12 @@ parse_partition_field(const std::string_view& str) { } std::optional> -parse_partition_field_list(const std::string_view& str) { +parse_partition_field_list(const std::string_view& str, parse_ctx& ctx) { auto unparsed = str; skip_space(unparsed); if (!skip_expected(unparsed, "(")) { + ctx.report_expected(unparsed, "'('"); return std::nullopt; } @@ -227,11 +248,12 @@ parse_partition_field_list(const std::string_view& str) { if (!result.fields.empty()) { skip_space(unparsed); if (!skip_expected(unparsed, ",")) { + ctx.report_expected(unparsed, ","); break; } } - auto field = parse_partition_field(unparsed); + auto field = parse_partition_field(unparsed, ctx); if (!field) { break; } @@ -241,6 +263,7 @@ parse_partition_field_list(const std::string_view& str) { skip_space(unparsed); if (!skip_expected(unparsed, ")")) { + ctx.report_expected(unparsed, "')'"); return std::nullopt; } @@ -252,15 +275,16 @@ parse_partition_field_list(const std::string_view& str) { } // namespace -std::optional +checked unresolved_partition_spec::parse(const std::string_view& str) { - auto res = parse_partition_field_list(str); + parse_ctx ctx{.original = str}; + auto res = parse_partition_field_list(str, ctx); if (!res) { - return std::nullopt; + return ctx.last_error; } skip_space(res->unparsed); if (!res->unparsed.empty()) { - return std::nullopt; + return fmt::format("unparsed: `{}'", res->unparsed); } return std::move(res->val); } diff --git a/src/v/iceberg/unresolved_partition_spec.h b/src/v/iceberg/unresolved_partition_spec.h index b100f75de7cd..cbf9cbeb3200 100644 --- a/src/v/iceberg/unresolved_partition_spec.h +++ b/src/v/iceberg/unresolved_partition_spec.h @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #pragma once +#include "base/outcome.h" #include "container/fragmented_vector.h" #include "iceberg/transform.h" @@ -34,7 +35,7 @@ struct unresolved_partition_spec { friend std::ostream& operator<<(std::ostream&, const unresolved_partition_spec&); - static std::optional + static checked parse(const std::string_view&); }; From eab573c879eb6b3fb5168d16b455bde478e16702 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 20 Jan 2025 13:43:00 +0100 Subject: [PATCH 09/15] datalake: plumb default_partition_spec property into coordinator --- src/v/datalake/coordinator/coordinator.h | 7 +++++-- src/v/datalake/coordinator/coordinator_manager.cc | 3 ++- src/v/datalake/coordinator/tests/coordinator_test.cc | 5 ++++- src/v/datalake/coordinator/tests/state_machine_test.cc | 7 ++++++- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/v/datalake/coordinator/coordinator.h b/src/v/datalake/coordinator/coordinator.h index 09a826bbd57f..f052e2b8d03f 100644 --- a/src/v/datalake/coordinator/coordinator.h +++ b/src/v/datalake/coordinator/coordinator.h @@ -45,14 +45,16 @@ class coordinator { schema_manager& schema_mgr, remove_tombstone_f remove_tombstone, file_committer& file_committer, - config::binding commit_interval) + config::binding commit_interval, + config::binding default_partition_spec) : stm_(std::move(stm)) , topic_table_(topics) , type_resolver_(type_resolver) , schema_mgr_(schema_mgr) , remove_tombstone_(std::move(remove_tombstone)) , file_committer_(file_committer) - , commit_interval_(std::move(commit_interval)) {} + , commit_interval_(std::move(commit_interval)) + , default_partition_spec_(std::move(default_partition_spec)) {} void start(); ss::future<> stop_and_wait(); @@ -105,6 +107,7 @@ class coordinator { remove_tombstone_f remove_tombstone_; file_committer& file_committer_; config::binding commit_interval_; + config::binding default_partition_spec_; ss::gate gate_; ss::abort_source as_; diff --git a/src/v/datalake/coordinator/coordinator_manager.cc b/src/v/datalake/coordinator/coordinator_manager.cc index 26a76db26cb3..e020e14e5536 100644 --- a/src/v/datalake/coordinator/coordinator_manager.cc +++ b/src/v/datalake/coordinator/coordinator_manager.cc @@ -123,7 +123,8 @@ void coordinator_manager::start_managing(cluster::partition& p) { return remove_tombstone(t, rev); }, *file_committer_, - config::shard_local_cfg().iceberg_catalog_commit_interval_ms.bind()); + config::shard_local_cfg().iceberg_catalog_commit_interval_ms.bind(), + config::shard_local_cfg().iceberg_default_partition_spec.bind()); if (p.is_leader()) { crd->notify_leadership(self_); } diff --git a/src/v/datalake/coordinator/tests/coordinator_test.cc b/src/v/datalake/coordinator/tests/coordinator_test.cc index 3a17ca6fe45e..4c3fb09e3dad 100644 --- a/src/v/datalake/coordinator/tests/coordinator_test.cc +++ b/src/v/datalake/coordinator/tests/coordinator_test.cc @@ -73,7 +73,8 @@ struct coordinator_node { return remove_tombstone(t, r); }, *file_committer, - commit_interval_ms.bind()) {} + commit_interval_ms.bind(), + default_partition_spec.bind()) {} ss::future> remove_tombstone(const model::topic&, model::revision_id) { @@ -90,6 +91,8 @@ struct coordinator_node { coordinator_stm& stm; config::mock_property commit_interval_ms; + config::mock_property default_partition_spec{ + "(hour(redpanda.timestamp))"}; cluster::data_migrations::migrated_resources mr; cluster::topic_table topic_table; datalake::binary_type_resolver type_resolver; diff --git a/src/v/datalake/coordinator/tests/state_machine_test.cc b/src/v/datalake/coordinator/tests/state_machine_test.cc index 88de4b7b9d8e..2e2b5404ea12 100644 --- a/src/v/datalake/coordinator/tests/state_machine_test.cc +++ b/src/v/datalake/coordinator/tests/state_machine_test.cc @@ -37,6 +37,10 @@ struct coordinator_stm_fixture : stm_raft_fixture { return config::mock_binding(1s); } + config::binding default_partition_spec() const { + return config::mock_binding("(hour(redpanda.timestamp))"); + } + stm_shptrs_t create_stms( state_machine_manager_builder& builder, raft_node_instance& node) override { @@ -58,7 +62,8 @@ struct coordinator_stm_fixture : stm_raft_fixture { return remove_tombstone(t, r); }, file_committer, - commit_interval()); + commit_interval(), + default_partition_spec()); coordinators[node.get_vnode()]->start(); return ss::now(); }); From 8301b0898aa0432539cb694c55cafe3fff3d6032 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 20 Jan 2025 15:23:49 +0100 Subject: [PATCH 10/15] dl/coordinator: factor out common ensure_table_exists code No functional changes. --- src/v/datalake/coordinator/coordinator.cc | 151 +++++++++++----------- src/v/datalake/coordinator/coordinator.h | 10 ++ 2 files changed, 84 insertions(+), 77 deletions(-) diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 333d10ca88a8..f51c2d799486 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -229,11 +229,22 @@ checked coordinator::maybe_gate() { return gate_.hold(); } +struct coordinator::table_schema_provider { + virtual iceberg::table_identifier get_id(const model::topic&) const = 0; + + virtual ss::future> + get_record_type(record_schema_components) const = 0; + + virtual ~table_schema_provider() = default; +}; + ss::future> -coordinator::sync_ensure_table_exists( +coordinator::do_ensure_table_exists( model::topic topic, model::revision_id topic_revision, - record_schema_components comps) { + record_schema_components comps, + std::string_view method_name, + const table_schema_provider& schema_provider) { auto gate = maybe_gate(); if (gate.has_error()) { co_return gate.error(); @@ -241,7 +252,8 @@ coordinator::sync_ensure_table_exists( vlog( datalake_log.debug, - "Sync ensure table exists requested, topic: {} rev: {}", + "{} requested, topic: {} rev: {}", + method_name, topic, topic_revision); @@ -261,15 +273,14 @@ coordinator::sync_ensure_table_exists( if (check_res.has_error()) { vlog( datalake_log.debug, - "Rejecting ensure_table_exist for {} rev {}: {}", + "Rejecting {} for {} rev {}: {}", + method_name, topic, topic_revision, check_res.error()); co_return errc::revision_mismatch; } - // Will skip the STM update if the topic is already live. - // But will still ensure the DLQ table schema. if (check_res.value()) { // update is non-trivial storage::record_batch_builder builder( @@ -286,21 +297,16 @@ coordinator::sync_ensure_table_exists( // TODO: verify stm state after replication - auto table_id = table_id_provider::table_id(topic); + auto table_id = schema_provider.get_id(topic); - std::optional val_type; - if (comps.val_identifier) { - auto type_res = co_await type_resolver_.resolve_identifier( - comps.val_identifier.value()); - if (type_res.has_error()) { - co_return errc::failed; - } - val_type = std::move(type_res.value()); + auto record_type = co_await schema_provider.get_record_type( + std::move(comps)); + if (!record_type.has_value()) { + co_return errc::failed; } - auto record_type = default_translator{}.build_type(std::move(val_type)); auto ensure_res = co_await schema_mgr_.ensure_table_schema( - table_id, record_type.type, hour_partition_spec()); + table_id, record_type.value(), hour_partition_spec()); if (ensure_res.has_error()) { switch (ensure_res.error()) { case schema_manager::errc::not_supported: @@ -315,77 +321,68 @@ coordinator::sync_ensure_table_exists( co_return std::nullopt; } -ss::future> -coordinator::sync_ensure_dlq_table_exists( - model::topic topic, model::revision_id topic_revision) { - auto gate = maybe_gate(); - if (gate.has_error()) { - co_return gate.error(); +struct coordinator::main_table_schema_provider + : public coordinator::table_schema_provider { + explicit main_table_schema_provider(coordinator& parent) + : parent(parent) {} + + iceberg::table_identifier get_id(const model::topic& topic) const final { + return table_id_provider::table_id(topic); } - vlog( - datalake_log.debug, - "Sync ensure dlq table exists requested, topic: {} rev: {}", - topic, - topic_revision); + ss::future> + get_record_type(record_schema_components comps) const final { + std::optional val_type; + if (comps.val_identifier) { + auto type_res = co_await parent.type_resolver_.resolve_identifier( + comps.val_identifier.value()); + if (type_res.has_error()) { + co_return errc::failed; + } + val_type = std::move(type_res.value()); + } - auto sync_res = co_await stm_->sync(10s); - if (sync_res.has_error()) { - co_return convert_stm_errc(sync_res.error()); + auto record_type = default_translator{}.build_type(std::move(val_type)); + co_return std::move(record_type.type); } - // TODO: add mutex to protect against the thundering herd problem + const coordinator& parent; +}; - topic_lifecycle_update update{ - .topic = topic, - .revision = topic_revision, - .new_state = topic_state::lifecycle_state_t::live, - }; - auto check_res = update.can_apply(stm_->state()); - if (check_res.has_error()) { - vlog( - datalake_log.debug, - "Rejecting ensure_dlq_table_exist for {} rev {}: {}", - topic, - topic_revision, - check_res.error()); - co_return errc::revision_mismatch; - } +ss::future> +coordinator::sync_ensure_table_exists( + model::topic topic, + model::revision_id topic_revision, + record_schema_components comps) { + co_return co_await do_ensure_table_exists( + topic, + topic_revision, + std::move(comps), + "sync_ensure_table_exists", + main_table_schema_provider{*this}); +} - // Will skip the STM update if the topic is already live. - // But will still ensure the DLQ table schema. - if (check_res.value()) { - // update is non-trivial - storage::record_batch_builder builder( - model::record_batch_type::datalake_coordinator, model::offset{0}); - builder.add_raw_kv( - serde::to_iobuf(topic_lifecycle_update::key), - serde::to_iobuf(std::move(update))); - auto repl_res = co_await stm_->replicate_and_wait( - sync_res.value(), std::move(builder).build(), as_); - if (repl_res.has_error()) { - co_return convert_stm_errc(repl_res.error()); - } +struct coordinator::dlq_table_schema_provider + : public coordinator::table_schema_provider { + iceberg::table_identifier get_id(const model::topic& topic) const final { + return table_id_provider::dlq_table_id(topic); } - // TODO: verify stm state after replication - - auto dlq_table_id = table_id_provider::dlq_table_id(topic); - auto record_type = key_value_translator{}.build_type(std::nullopt); - auto ensure_res = co_await schema_mgr_.ensure_table_schema( - dlq_table_id, record_type.type, hour_partition_spec()); - if (ensure_res.has_error()) { - switch (ensure_res.error()) { - case schema_manager::errc::not_supported: - co_return errc::incompatible_schema; - case schema_manager::errc::failed: - co_return errc::failed; - case schema_manager::errc::shutting_down: - co_return errc::shutting_down; - } + ss::future> + get_record_type(record_schema_components) const final { + co_return key_value_translator{}.build_type(std::nullopt).type; } +}; - co_return std::nullopt; +ss::future> +coordinator::sync_ensure_dlq_table_exists( + model::topic topic, model::revision_id topic_revision) { + co_return co_await do_ensure_table_exists( + topic, + topic_revision, + record_schema_components{}, + "sync_ensure_dlq_table_exists", + dlq_table_schema_provider{}); } ss::future> diff --git a/src/v/datalake/coordinator/coordinator.h b/src/v/datalake/coordinator/coordinator.h index f052e2b8d03f..b3bae41c1462 100644 --- a/src/v/datalake/coordinator/coordinator.h +++ b/src/v/datalake/coordinator/coordinator.h @@ -100,6 +100,16 @@ class coordinator { ss::future> update_lifecycle_state(const model::topic&, model::term_id); + struct table_schema_provider; + struct main_table_schema_provider; + struct dlq_table_schema_provider; + ss::future> do_ensure_table_exists( + model::topic, + model::revision_id topic_revision, + record_schema_components, + std::string_view method_name, + const table_schema_provider&); + ss::shared_ptr stm_; cluster::topic_table& topic_table_; type_resolver& type_resolver_; From 7d4ea330ead46caaabe64029b82b7d25eec842f2 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 20 Jan 2025 16:39:17 +0100 Subject: [PATCH 11/15] dl/coordinator: get partition spec from the cluster/topic config --- src/v/datalake/coordinator/coordinator.cc | 52 ++++++++++++++++++- .../coordinator/tests/coordinator_test.cc | 25 +++++++++ .../coordinator/tests/state_machine_test.cc | 17 ++++-- 3 files changed, 89 insertions(+), 5 deletions(-) diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index f51c2d799486..acb402a90fe4 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -235,6 +235,9 @@ struct coordinator::table_schema_provider { virtual ss::future> get_record_type(record_schema_components) const = 0; + virtual ss::sstring get_partition_spec(const cluster::topic_metadata&) const + = 0; + virtual ~table_schema_provider() = default; }; @@ -264,6 +267,35 @@ coordinator::do_ensure_table_exists( // TODO: add mutex to protect against the thundering herd problem + auto topic_md = topic_table_.get_topic_metadata_ref( + model::topic_namespace_view{model::kafka_namespace, topic}); + if (!topic_md || topic_md->get().get_revision() != topic_revision) { + vlog( + datalake_log.debug, + "Rejecting {} for {} rev {}, topic table revision {}", + method_name, + topic, + topic_revision, + topic_md ? std::optional{topic_md->get().get_revision()} + : std::nullopt); + co_return errc::revision_mismatch; + } + + auto partition_spec_str = schema_provider.get_partition_spec( + topic_md->get()); + auto partition_spec = iceberg::unresolved_partition_spec::parse( + partition_spec_str); + if (!partition_spec.has_value()) { + vlog( + datalake_log.warn, + "{} failed, couldn't parse partition spec for {} rev {}: {}", + method_name, + topic, + topic_revision, + partition_spec.error()); + co_return errc::failed; + } + topic_lifecycle_update update{ .topic = topic, .revision = topic_revision, @@ -306,7 +338,7 @@ coordinator::do_ensure_table_exists( } auto ensure_res = co_await schema_mgr_.ensure_table_schema( - table_id, record_type.value(), hour_partition_spec()); + table_id, record_type.value(), partition_spec.value()); if (ensure_res.has_error()) { switch (ensure_res.error()) { case schema_manager::errc::not_supported: @@ -346,6 +378,13 @@ struct coordinator::main_table_schema_provider co_return std::move(record_type.type); } + ss::sstring + get_partition_spec(const cluster::topic_metadata& topic_md) const final { + return topic_md.get_configuration() + .properties.iceberg_partition_spec.value_or( + parent.default_partition_spec_()); + } + const coordinator& parent; }; @@ -364,6 +403,9 @@ coordinator::sync_ensure_table_exists( struct coordinator::dlq_table_schema_provider : public coordinator::table_schema_provider { + explicit dlq_table_schema_provider(coordinator& parent) + : parent(parent) {} + iceberg::table_identifier get_id(const model::topic& topic) const final { return table_id_provider::dlq_table_id(topic); } @@ -372,6 +414,12 @@ struct coordinator::dlq_table_schema_provider get_record_type(record_schema_components) const final { co_return key_value_translator{}.build_type(std::nullopt).type; } + + ss::sstring get_partition_spec(const cluster::topic_metadata&) const final { + return parent.default_partition_spec_(); + } + + const coordinator& parent; }; ss::future> @@ -382,7 +430,7 @@ coordinator::sync_ensure_dlq_table_exists( topic_revision, record_schema_components{}, "sync_ensure_dlq_table_exists", - dlq_table_schema_provider{}); + dlq_table_schema_provider{*this}); } ss::future> diff --git a/src/v/datalake/coordinator/tests/coordinator_test.cc b/src/v/datalake/coordinator/tests/coordinator_test.cc index 4c3fb09e3dad..796ff313a48e 100644 --- a/src/v/datalake/coordinator/tests/coordinator_test.cc +++ b/src/v/datalake/coordinator/tests/coordinator_test.cc @@ -241,6 +241,23 @@ class CoordinatorTest : public raft::raft_fixture { raft::raft_fixture::TearDownAsync().get(); } + void register_in_topic_tables( + const model::topic& topic, model::revision_id rev) { + auto topic_cfg = cluster::topic_configuration( + model::kafka_namespace, topic, 1, 1); + for (auto& crd : crds) { + auto tt_res = crd->topic_table + .apply( + cluster::create_topic_cmd{ + model::topic_namespace{ + model::kafka_namespace, topic}, + {topic_cfg, {}}}, + model::offset{rev()}) + .get(); + ASSERT_EQ(tt_res, cluster::errc::success); + } + } + // Returns the coordinator on the current leader. using opt_ref = std::optional>; opt_ref leader_node() { @@ -332,8 +349,10 @@ TEST_F(CoordinatorTest, TestAddFilesHappyPath) { const auto tp00 = tp(0, 0); const auto tp01 = tp(0, 1); const model::revision_id rev0{1}; + register_in_topic_tables(tp00.topic, rev0); const auto tp10 = tp(1, 0); const model::revision_id rev1{2}; + register_in_topic_tables(tp10.topic, rev1); leader.ensure_table(tp00.topic, rev0); pairs_t total_expected_00; @@ -400,6 +419,7 @@ TEST_F(CoordinatorTest, TestLastAddedHappyPath) { const auto tp00 = tp(0, 0); const auto tp01 = tp(0, 1); const model::revision_id rev{1}; + register_in_topic_tables(tp00.topic, rev); leader.ensure_table(tp00.topic, rev); pairs_t total_expected_00; for (const auto& v : @@ -433,6 +453,7 @@ TEST_F(CoordinatorTest, TestNotLeader) { auto& non_leader = non_leader_opt->get(); const auto tp00 = tp(0, 0); const model::revision_id rev{1}; + register_in_topic_tables(tp00.topic, rev); leader_opt.value().get().ensure_table(tp00.topic, rev); pairs_t total_expected_00; @@ -461,6 +482,7 @@ TEST_P(CoordinatorTestWithParams, TestConcurrentAddFiles) { } const auto tp00 = tp(0, 0); const model::revision_id rev0{1}; + register_in_topic_tables(tp00.topic, rev0); bool done = false; std::vector> adders; int fiber_id = 0; @@ -569,6 +591,7 @@ TEST_F(CoordinatorLoopTest, TestCommitFilesHappyPath) { auto& leader = leader_opt->get(); const auto tp00 = tp(0, 0); const model::revision_id rev0{1}; + register_in_topic_tables(tp00.topic, rev0); leader.ensure_table(tp00.topic, rev0); auto add_res = leader.crd .sync_add_files(tp00, rev0, make_pending_files({{0, 100}})) @@ -599,6 +622,7 @@ TEST_F(CoordinatorLoopTest, TestCommitFilesNotLeader) { auto& leader = leader_opt->get(); const auto tp00 = tp(0, 0); const model::revision_id rev0{1}; + register_in_topic_tables(tp00.topic, rev0); leader.ensure_table(tp00.topic, rev0); auto add_res = leader.crd .sync_add_files(tp00, rev0, make_pending_files({{0, 100}})) @@ -680,6 +704,7 @@ TEST_F(CoordinatorSleepingLoopTest, TestQuickShutdownOnLeadershipChange) { for (int i = 0; i < 100; i++) { auto t = tp(i, 0); auto rev = model::revision_id{i}; + register_in_topic_tables(t.topic, rev); leader.ensure_table(t.topic, rev); auto add_res = leader.crd .sync_add_files(t, rev, make_pending_files({{0, 100}})) diff --git a/src/v/datalake/coordinator/tests/state_machine_test.cc b/src/v/datalake/coordinator/tests/state_machine_test.cc index 2e2b5404ea12..cc9eb2783e53 100644 --- a/src/v/datalake/coordinator/tests/state_machine_test.cc +++ b/src/v/datalake/coordinator/tests/state_machine_test.cc @@ -127,11 +127,20 @@ struct coordinator_stm_fixture : stm_raft_fixture { model::topic_partition random_tp() const { return { - model::topic{"test"}, + tp_ns.tp, model::partition_id( random_generators::get_int(0, max_partitions - 1))}; } + ss::future<> register_in_topic_table() { + auto topic_cfg = cluster::topic_configuration( + tp_ns.ns, tp_ns.tp, /*partition_count=*/1, /*replication_factor=*/1); + auto tt_res = co_await topic_table.apply( + cluster::create_topic_cmd{tp_ns, {topic_cfg, {}}}, + model::offset{rev()}); + ASSERT_EQ_CORO(tt_res, cluster::errc::success); + } + ss::future< checked> remove_tombstone(const model::topic&, model::revision_id) { @@ -139,7 +148,8 @@ struct coordinator_stm_fixture : stm_raft_fixture { } static constexpr int32_t max_partitions = 5; - model::topic_partition tp{model::topic{"test"}, model::partition_id{0}}; + model::topic_namespace tp_ns{model::kafka_namespace, model::topic{"test"}}; + model::topic_partition tp{tp_ns.tp, model::partition_id{0}}; model::revision_id rev{123}; cluster::data_migrations::migrated_resources mr; cluster::topic_table topic_table{mr}; @@ -152,6 +162,7 @@ struct coordinator_stm_fixture : stm_raft_fixture { TEST_F_CORO(coordinator_stm_fixture, test_snapshots) { co_await initialize(); co_await wait_for_leader(5s); + co_await register_in_topic_table(); // populate some data until the state machine is snapshotted // a few times @@ -229,7 +240,7 @@ TEST_F_CORO(coordinator_stm_fixture, test_snapshots) { for (int32_t pid = 0; pid < max_partitions; pid++) { auto committed_offsets = last_committed_offsets( - {model::topic{"test"}, model::partition_id{pid}}); + {tp_ns.tp, model::partition_id{pid}}); vlog(logger().info, "committed offsets: {}", committed_offsets); ASSERT_TRUE_CORO(std::equal( committed_offsets.begin() + 1, From a9e58304e3116654d96f3a5a6de4f511aaee7e53 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 20 Jan 2025 19:44:45 +0100 Subject: [PATCH 12/15] datalake: move hour_partition_spec() to test_utils As we now set the partition spec from config property values, we don't need a hardcoded spec in the main binary. --- src/v/datalake/BUILD | 4 ---- src/v/datalake/coordinator/BUILD | 1 - src/v/datalake/coordinator/coordinator.cc | 1 - src/v/datalake/coordinator/tests/BUILD | 1 + src/v/datalake/coordinator/tests/CMakeLists.txt | 1 + .../tests/iceberg_file_committer_test.cc | 1 + src/v/datalake/table_definition.cc | 14 -------------- src/v/datalake/table_definition.h | 4 ---- src/v/datalake/tests/BUILD | 3 ++- src/v/datalake/tests/CMakeLists.txt | 1 + src/v/datalake/tests/partitioning_writer_test.cc | 1 + src/v/datalake/tests/test_utils.cc | 13 ++++++++++++- src/v/datalake/tests/test_utils.h | 3 +++ 13 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/v/datalake/BUILD b/src/v/datalake/BUILD index ca4ae9fc3133..39ecf3c34fe0 100644 --- a/src/v/datalake/BUILD +++ b/src/v/datalake/BUILD @@ -235,13 +235,9 @@ redpanda_cc_library( hdrs = [ "table_definition.h", ], - implementation_deps = [ - "//src/v/iceberg:transform", - ], include_prefix = "datalake", visibility = [":__subpackages__"], deps = [ - "//src/v/iceberg:partition", "//src/v/iceberg:schema", ], ) diff --git a/src/v/datalake/coordinator/BUILD b/src/v/datalake/coordinator/BUILD index 8f21cfda4030..31058ee06de4 100644 --- a/src/v/datalake/coordinator/BUILD +++ b/src/v/datalake/coordinator/BUILD @@ -43,7 +43,6 @@ redpanda_cc_library( "//src/v/datalake:record_schema_resolver", "//src/v/datalake:record_translator", "//src/v/datalake:schema_identifier", - "//src/v/datalake:table_definition", "//src/v/datalake:table_id_provider", "//src/v/datalake:types", "//src/v/ssx:future_util", diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index acb402a90fe4..1997602534cb 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -16,7 +16,6 @@ #include "datalake/coordinator/state_update.h" #include "datalake/logger.h" #include "datalake/record_translator.h" -#include "datalake/table_definition.h" #include "datalake/table_id_provider.h" #include "model/fundamental.h" #include "ssx/future-util.h" diff --git a/src/v/datalake/coordinator/tests/BUILD b/src/v/datalake/coordinator/tests/BUILD index dc883af5258c..63af8fc70dee 100644 --- a/src/v/datalake/coordinator/tests/BUILD +++ b/src/v/datalake/coordinator/tests/BUILD @@ -66,6 +66,7 @@ redpanda_cc_gtest( "//src/v/datalake:logger", "//src/v/datalake:table_definition", "//src/v/datalake/coordinator:iceberg_file_committer", + "//src/v/datalake/tests:test_utils", "//src/v/iceberg:filesystem_catalog", "//src/v/iceberg:manifest_entry", "//src/v/iceberg:manifest_io", diff --git a/src/v/datalake/coordinator/tests/CMakeLists.txt b/src/v/datalake/coordinator/tests/CMakeLists.txt index d72700bddf02..85568a05a5c5 100644 --- a/src/v/datalake/coordinator/tests/CMakeLists.txt +++ b/src/v/datalake/coordinator/tests/CMakeLists.txt @@ -37,6 +37,7 @@ rp_test( v::cloud_io_utils v::datalake_coordinator v::datalake_coordinator_test_utils + v::datalake_test_utils v::gtest_main v::iceberg v::raft_fixture diff --git a/src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc b/src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc index f2bbed30581f..a19c26f42fa3 100644 --- a/src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc +++ b/src/v/datalake/coordinator/tests/iceberg_file_committer_test.cc @@ -14,6 +14,7 @@ #include "datalake/coordinator/iceberg_file_committer.h" #include "datalake/coordinator/tests/state_test_utils.h" #include "datalake/table_definition.h" +#include "datalake/tests/test_utils.h" #include "iceberg/filesystem_catalog.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_io.h" diff --git a/src/v/datalake/table_definition.cc b/src/v/datalake/table_definition.cc index 634de8a649ef..d2fab622a0f4 100644 --- a/src/v/datalake/table_definition.cc +++ b/src/v/datalake/table_definition.cc @@ -9,8 +9,6 @@ */ #include "datalake/table_definition.h" -#include "iceberg/transform.h" - namespace datalake { using namespace iceberg; struct_type schemaless_struct_type() { @@ -54,16 +52,4 @@ schema default_schema() { }; } -unresolved_partition_spec hour_partition_spec() { - chunked_vector fields; - fields.push_back({ - .source_name = {"redpanda", "timestamp"}, - .transform = hour_transform{}, - .name = "redpanda_timestamp_hour", - }); - return { - .fields = std::move(fields), - }; -} - } // namespace datalake diff --git a/src/v/datalake/table_definition.h b/src/v/datalake/table_definition.h index 4c0c365e8e50..6eb8f70791f0 100644 --- a/src/v/datalake/table_definition.h +++ b/src/v/datalake/table_definition.h @@ -9,7 +9,6 @@ */ #pragma once -#include "iceberg/partition.h" #include "iceberg/schema.h" namespace datalake { @@ -22,7 +21,4 @@ iceberg::struct_type schemaless_struct_type(); iceberg::schema default_schema(); inline constexpr std::string_view rp_struct_name = "redpanda"; -// Hourly partitioning on the timestamp of a schema with the above fields. -iceberg::unresolved_partition_spec hour_partition_spec(); - } // namespace datalake diff --git a/src/v/datalake/tests/BUILD b/src/v/datalake/tests/BUILD index 2c8030dbdd6e..921ae0e49f2c 100644 --- a/src/v/datalake/tests/BUILD +++ b/src/v/datalake/tests/BUILD @@ -152,7 +152,7 @@ redpanda_test_cc_library( "//src/v/datalake:table_id_provider", ], include_prefix = "datalake/tests", - visibility = [":__subpackages__"], + visibility = ["//visibility:public"], deps = [ "//src/v/datalake:catalog_schema_manager", "//src/v/datalake:record_schema_resolver", @@ -217,6 +217,7 @@ redpanda_cc_gtest( ], deps = [ ":test_data_writer", + ":test_utils", "//src/v/bytes", "//src/v/datalake:partitioning_writer", "//src/v/datalake:table_definition", diff --git a/src/v/datalake/tests/CMakeLists.txt b/src/v/datalake/tests/CMakeLists.txt index e94963febef6..cc0d683d20a6 100644 --- a/src/v/datalake/tests/CMakeLists.txt +++ b/src/v/datalake/tests/CMakeLists.txt @@ -50,6 +50,7 @@ rp_test( v::bytes v::utils v::gtest_main + v::datalake_test_utils v::datalake_writer v::iceberg_test_utils v::schema diff --git a/src/v/datalake/tests/partitioning_writer_test.cc b/src/v/datalake/tests/partitioning_writer_test.cc index 3b4c38216a02..dd01109cf5ee 100644 --- a/src/v/datalake/tests/partitioning_writer_test.cc +++ b/src/v/datalake/tests/partitioning_writer_test.cc @@ -11,6 +11,7 @@ #include "datalake/partitioning_writer.h" #include "datalake/table_definition.h" #include "datalake/tests/test_data_writer.h" +#include "datalake/tests/test_utils.h" #include "iceberg/tests/test_schemas.h" #include "iceberg/tests/value_generator.h" #include "model/timestamp.h" diff --git a/src/v/datalake/tests/test_utils.cc b/src/v/datalake/tests/test_utils.cc index ef620bab0318..a2e17228af0f 100644 --- a/src/v/datalake/tests/test_utils.cc +++ b/src/v/datalake/tests/test_utils.cc @@ -11,13 +11,24 @@ #include "datalake/tests/test_utils.h" #include "datalake/record_translator.h" -#include "datalake/table_definition.h" #include "datalake/table_id_provider.h" #include namespace datalake { +iceberg::unresolved_partition_spec hour_partition_spec() { + chunked_vector fields; + fields.push_back({ + .source_name = {"redpanda", "timestamp"}, + .transform = iceberg::hour_transform{}, + .name = "redpanda.timestamp_hour", + }); + return { + .fields = std::move(fields), + }; +} + direct_table_creator::direct_table_creator( type_resolver& tr, schema_manager& sm) : type_resolver_(tr) diff --git a/src/v/datalake/tests/test_utils.h b/src/v/datalake/tests/test_utils.h index f0339d26d61b..6d73b38bcf2b 100644 --- a/src/v/datalake/tests/test_utils.h +++ b/src/v/datalake/tests/test_utils.h @@ -15,6 +15,9 @@ namespace datalake { +// Hourly partitioning on the redpanda.timestamp field. +iceberg::unresolved_partition_spec hour_partition_spec(); + // Creates or alters the table by interfacing directly with a catalog. class direct_table_creator : public table_creator { public: From eba131f425b6ee32f0b42cecc799736896dc1350 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 3 Jan 2025 16:15:56 +0100 Subject: [PATCH 13/15] TO REMOVE: print create_table catalog request json --- src/v/iceberg/rest_client/catalog_client.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/v/iceberg/rest_client/catalog_client.cc b/src/v/iceberg/rest_client/catalog_client.cc index 668ff484ec84..5cbbeacd561d 100644 --- a/src/v/iceberg/rest_client/catalog_client.cc +++ b/src/v/iceberg/rest_client/catalog_client.cc @@ -14,6 +14,7 @@ #include "http/request_builder.h" #include "http/utils.h" #include "iceberg/json_writer.h" +#include "iceberg/logger.h" #include "iceberg/rest_client/entities.h" #include "iceberg/rest_client/json.h" #include "iceberg/table_requests_json.h" @@ -216,6 +217,13 @@ ss::future> catalog_client::create_table( .with_bearer_auth(token.value()) .with_content_type(json_content_type); + auto json_buf = serialize_payload_as_json(req); + ss::sstring json_str; + for (const auto& frag : json_buf) { + json_str.append(frag.get(), frag.size()); + } + vlog(log.info, "FFF {}", json_str); + co_return (co_await perform_request( rtc, http_request, serialize_payload_as_json(req))) .and_then(parse_json) From 54cff39fa1079a4295ce1b72072d9564fdfe53bd Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 3 Jan 2025 22:08:50 +0100 Subject: [PATCH 14/15] WIP add days transform impl --- src/v/iceberg/BUILD | 1 + src/v/iceberg/time_transform_visitor.cc | 24 +++++++++++++++++++++++- src/v/iceberg/time_transform_visitor.h | 10 ++++++++++ src/v/iceberg/transform_utils.cc | 5 +++++ 4 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index cef541dfb059..2979804f535b 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -954,6 +954,7 @@ redpanda_cc_library( include_prefix = "iceberg", deps = [ ":values", + "@seastar", ], ) diff --git a/src/v/iceberg/time_transform_visitor.cc b/src/v/iceberg/time_transform_visitor.cc index c6ad8c1aba3b..286c224e7e56 100644 --- a/src/v/iceberg/time_transform_visitor.cc +++ b/src/v/iceberg/time_transform_visitor.cc @@ -10,15 +10,26 @@ #include "iceberg/values.h" +#include + namespace iceberg { namespace { + +static constexpr int64_t micros_per_s = 1'000'000; + int32_t micros_to_hr(int64_t micros) { static constexpr int64_t s_per_hr = 3600; - static constexpr int64_t micros_per_s = 1000000; static constexpr int64_t micros_per_hr = micros_per_s * s_per_hr; return static_cast(micros / micros_per_hr); } + +int32_t micros_to_day(int64_t micros) { + static constexpr int64_t s_per_day = 86400; + static constexpr int64_t micros_per_day = micros_per_s * s_per_day; + return static_cast(micros / micros_per_day); +} + } // namespace int32_t hour_transform_visitor::operator()(const primitive_value& v) { @@ -35,4 +46,15 @@ int32_t hour_transform_visitor::operator()(const primitive_value& v) { fmt::format("hourly_visitor not implemented for primitive value {}", v)); } +int32_t day_transform_visitor::operator()(const primitive_value& v) { + return ss::visit( + v, + [](const timestamp_value& v) { return micros_to_day(v.val); }, + [](const timestamptz_value& v) { return micros_to_day(v.val); }, + [](const auto& v) -> int32_t { + throw std::invalid_argument(fmt::format( + "day_transform_visitor not implemented for primitive value {}", v)); + }); +} + } // namespace iceberg diff --git a/src/v/iceberg/time_transform_visitor.h b/src/v/iceberg/time_transform_visitor.h index 749027a326d1..b167d00cde87 100644 --- a/src/v/iceberg/time_transform_visitor.h +++ b/src/v/iceberg/time_transform_visitor.h @@ -22,4 +22,14 @@ struct hour_transform_visitor { } }; +struct day_transform_visitor { + int32_t operator()(const primitive_value& v); + + template + int32_t operator()(const T& t) { + throw std::invalid_argument( + fmt::format("day_transform_visitor not implemented for value {}", t)); + } +}; + } // namespace iceberg diff --git a/src/v/iceberg/transform_utils.cc b/src/v/iceberg/transform_utils.cc index 84af4a4826d1..45bf00685c80 100644 --- a/src/v/iceberg/transform_utils.cc +++ b/src/v/iceberg/transform_utils.cc @@ -33,6 +33,11 @@ struct transform_applying_visitor { return v; } + value operator()(const day_transform&) { + int_value v{std::visit(day_transform_visitor{}, source_val_)}; + return v; + } + template value operator()(const T&) { throw std::invalid_argument( From 8af772e788176b811c313c672194acbed740c897 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 20 Jan 2025 21:18:18 +0100 Subject: [PATCH 15/15] WIP k/create_topics: iceberg_partition_spec property validation --- src/v/kafka/CMakeLists.txt | 1 + src/v/kafka/server/BUILD | 1 + .../kafka/server/handlers/topics/validators.h | 34 ++++++++++++++----- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index aac6d12d02a6..998525241d1f 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -62,6 +62,7 @@ v_cc_library( v::bytes v::rpc v::cluster + v::iceberg_unresolved_partition_spec v::kafka_partition_proxy v::kafka_protocol v::security diff --git a/src/v/kafka/server/BUILD b/src/v/kafka/server/BUILD index 8214764ca415..3ec1e0f70089 100644 --- a/src/v/kafka/server/BUILD +++ b/src/v/kafka/server/BUILD @@ -176,6 +176,7 @@ redpanda_cc_library( "//src/v/features", "//src/v/hashing:jump_consistent", "//src/v/hashing:xx", + "//src/v/iceberg:unresolved_partition_spec", "//src/v/kafka/data:partition_proxy", "//src/v/kafka/protocol", "//src/v/kafka/protocol:add_offsets_to_txn", diff --git a/src/v/kafka/server/handlers/topics/validators.h b/src/v/kafka/server/handlers/topics/validators.h index 79b26d4a14bb..21162bba2f60 100644 --- a/src/v/kafka/server/handlers/topics/validators.h +++ b/src/v/kafka/server/handlers/topics/validators.h @@ -11,6 +11,7 @@ #pragma once #include "config/configuration.h" +#include "iceberg/unresolved_partition_spec.h" #include "kafka/protocol/schemata/create_topics_request.h" #include "kafka/protocol/schemata/create_topics_response.h" #include "kafka/server/handlers/topics/types.h" @@ -276,22 +277,37 @@ struct iceberg_config_validator { static constexpr error_code ec = error_code::invalid_config; static bool is_valid(const creatable_topic& c) { - auto it = std::find_if( + model::iceberg_mode parsed_mode = model::iceberg_mode::disabled; + + auto mode_it = std::find_if( c.configs.begin(), c.configs.end(), [](const createable_topic_config& cfg) { return cfg.name == topic_property_iceberg_mode; }); - if (it == c.configs.end() || !it->value.has_value()) { - return true; + if (mode_it != c.configs.end() && mode_it->value.has_value()) { + try { + parsed_mode = boost::lexical_cast( + mode_it->value.value()); + } catch (...) { + return false; + } } - model::iceberg_mode parsed_mode; - try { - parsed_mode = boost::lexical_cast( - it->value.value()); - } catch (...) { - return false; + + auto pspec_it = std::find_if( + c.configs.begin(), + c.configs.end(), + [](const createable_topic_config& cfg) { + return cfg.name == topic_property_iceberg_partition_spec; + }); + if (pspec_it != c.configs.end() && pspec_it->value.has_value()) { + auto parsed = iceberg::unresolved_partition_spec::parse( + pspec_it->value.value()); + if (!parsed.has_value()) { + return false; + } } + // If iceberg is enabled at the cluster level, the topic can // be created with any override. If it is disabled // at the cluster level, it cannot be enabled with a topic