Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Iceberg custom partitioning #24838

Draft
wants to merge 15 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
false,
tristate<std::chrono::milliseconds>{},
std::nullopt,
std::nullopt,
};

auto random_initial_revision_id
Expand Down
9 changes: 6 additions & 3 deletions src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"remote_label: {}, iceberg_mode: {}, "
"leaders_preference: {}, "
"delete_retention_ms: {}, "
"iceberg_delete: {}",
"iceberg_delete: {}, ",
"iceberg_partition_spec: {}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -79,7 +80,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.iceberg_mode,
properties.leaders_preference,
properties.delete_retention_ms,
properties.iceberg_delete);
properties.iceberg_delete,
properties.iceberg_partition_spec);

if (config::shard_local_cfg().development_enable_cloud_topics()) {
fmt::print(
Expand Down Expand Up @@ -125,7 +127,7 @@ bool topic_properties::has_overrides() const {
|| flush_bytes.has_value() || remote_label.has_value()
|| (iceberg_mode != storage::ntp_config::default_iceberg_mode)
|| leaders_preference.has_value() || delete_retention_ms.is_engaged()
|| iceberg_delete.has_value();
|| iceberg_delete.has_value() || iceberg_partition_spec.has_value();

if (config::shard_local_cfg().development_enable_cloud_topics()) {
return overrides
Expand Down Expand Up @@ -261,6 +263,7 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
false,
tristate<std::chrono::milliseconds>{disable_tristate},
std::nullopt,
std::nullopt,
};
}

Expand Down
12 changes: 8 additions & 4 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace cluster {
*/
struct topic_properties
: serde::
envelope<topic_properties, serde::version<10>, serde::compat_version<0>> {
envelope<topic_properties, serde::version<11>, serde::compat_version<0>> {
topic_properties() noexcept = default;
topic_properties(
std::optional<model::compression> compression,
Expand Down Expand Up @@ -76,7 +76,8 @@ struct topic_properties
std::optional<config::leaders_preference> leaders_preference,
bool cloud_topic_enabled,
tristate<std::chrono::milliseconds> delete_retention_ms,
std::optional<bool> iceberg_delete)
std::optional<bool> iceberg_delete,
std::optional<ss::sstring> iceberg_partition_spec)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -118,7 +119,8 @@ struct topic_properties
, leaders_preference(std::move(leaders_preference))
, cloud_topic_enabled(cloud_topic_enabled)
, delete_retention_ms(delete_retention_ms)
, iceberg_delete(iceberg_delete) {}
, iceberg_delete(iceberg_delete)
, iceberg_partition_spec(std::move(iceberg_partition_spec)) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -194,6 +196,7 @@ struct topic_properties
tristate<std::chrono::milliseconds> delete_retention_ms{disable_tristate};
// Should we delete the corresponding iceberg table when deleting the topic.
std::optional<bool> iceberg_delete;
std::optional<ss::sstring> iceberg_partition_spec;

bool is_compacted() const;
bool has_overrides() const;
Expand Down Expand Up @@ -241,7 +244,8 @@ struct topic_properties
leaders_preference,
cloud_topic_enabled,
delete_retention_ms,
iceberg_delete);
iceberg_delete,
iceberg_partition_spec);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,9 @@ topic_properties topic_table::update_topic_properties(
updated_properties.delete_retention_ms, overrides.delete_retention_ms);
incremental_update(
updated_properties.iceberg_delete, overrides.iceberg_delete);
incremental_update(
updated_properties.iceberg_partition_spec,
overrides.iceberg_partition_spec);
return updated_properties;
}

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
"initial_retention_local_target_bytes: {}, "
"initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, "
"flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, "
"remote_read: {}, remote_write: {}, iceberg_delete: {}",
"remote_read: {}, remote_write: {}, iceberg_delete: {}, "
"iceberg_partition_spec: {}",
i.compression,
i.cleanup_policy_bitflags,
i.compaction_strategy,
Expand Down Expand Up @@ -417,7 +418,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
i.leaders_preference,
i.remote_read,
i.remote_write,
i.iceberg_delete);
i.iceberg_delete,
i.iceberg_partition_spec);
return o;
}

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ struct property_update<tristate<T>>
struct incremental_topic_updates
: serde::envelope<
incremental_topic_updates,
serde::version<7>,
serde::version<8>,
serde::compat_version<0>> {
static constexpr int8_t version_with_data_policy = -1;
static constexpr int8_t version_with_shadow_indexing = -3;
Expand Down Expand Up @@ -641,6 +641,7 @@ struct incremental_topic_updates
leaders_preference;
property_update<tristate<std::chrono::milliseconds>> delete_retention_ms;
property_update<std::optional<bool>> iceberg_delete;
property_update<std::optional<ss::sstring>> iceberg_partition_spec;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
Expand Down Expand Up @@ -680,7 +681,8 @@ struct incremental_topic_updates
remote_read,
remote_write,
delete_retention_ms,
iceberg_delete);
iceberg_delete,
iceberg_partition_spec);
}

friend std::ostream&
Expand Down
1 change: 1 addition & 0 deletions src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ struct instance_generator<cluster::topic_properties> {
std::nullopt,
false,
tristate<std::chrono::milliseconds>{disable_tristate},
std::nullopt,
std::nullopt};
}

Expand Down
9 changes: 9 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3793,6 +3793,15 @@ configuration::configuration()
"the topic.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
true)
, iceberg_default_partition_spec(
*this,
"iceberg_default_partition_spec",
"Default value for the redpanda.iceberg.partition.spec topic property "
"that determines the partition spec for the Iceberg table corresponding "
"to the topic.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
"(hour(redpanda.timestamp))",
&validate_non_empty_string_opt)
, development_enable_cloud_topics(
*this,
"development_enable_cloud_topics",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ struct configuration final : public config_store {
property<std::optional<ss::sstring>> iceberg_rest_catalog_prefix;

property<bool> iceberg_delete;
property<ss::sstring> iceberg_default_partition_spec;

configuration();

Expand Down
4 changes: 0 additions & 4 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,9 @@ redpanda_cc_library(
hdrs = [
"table_definition.h",
],
implementation_deps = [
"//src/v/iceberg:transform",
],
include_prefix = "datalake",
visibility = [":__subpackages__"],
deps = [
"//src/v/iceberg:partition",
"//src/v/iceberg:schema",
],
)
Expand Down
1 change: 0 additions & 1 deletion src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ redpanda_cc_library(
"//src/v/datalake:record_schema_resolver",
"//src/v/datalake:record_translator",
"//src/v/datalake:schema_identifier",
"//src/v/datalake:table_definition",
"//src/v/datalake:table_id_provider",
"//src/v/datalake:types",
"//src/v/ssx:future_util",
Expand Down
Loading
Loading