Skip to content

Commit

Permalink
Merge pull request #24472 from redpanda-data/stephan/partition-memory…
Browse files Browse the repository at this point in the history
…-groups

Make partitions memory group aware
  • Loading branch information
StephanDollberg authored Jan 2, 2025
2 parents f28f9fa + 9427349 commit e32dc25
Show file tree
Hide file tree
Showing 19 changed files with 408 additions and 55 deletions.
14 changes: 14 additions & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "topic_memory_per_partition_default",
hdrs = [
"scheduling/topic_memory_per_partition_default.h",
],
include_prefix = "cluster",
visibility = ["//visibility:public"],
deps = [
"//src/v/base",
],
)

# TODO the following headers are the only headers in redpanda which are excluded
# from clang-tidy. if you remove the exclusion then you'll observe a tangled web
# of header dependencies. after many hours those have not yet been resolved, so
Expand Down Expand Up @@ -617,7 +629,9 @@ redpanda_cc_library(
"version.h",
],
implementation_deps = [
":topic_memory_per_partition_default",
"//src/v/features:enterprise_feature_messages",
"//src/v/resource_mgmt:memory_groups",
],
include_prefix = "cluster",
visibility = ["//visibility:public"],
Expand Down
1 change: 0 additions & 1 deletion src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ ss::future<> controller::wire_up() {
return _partition_allocator.start_single(
std::ref(_members_table),
std::ref(_feature_table),
config::shard_local_cfg().topic_memory_per_partition.bind(),
config::shard_local_cfg().topic_fds_per_partition.bind(),
config::shard_local_cfg().topic_partitions_per_shard.bind(),
config::shard_local_cfg().topic_partitions_reserve_shard0.bind(),
Expand Down
124 changes: 102 additions & 22 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/scheduling/constraints.h"
#include "cluster/scheduling/topic_memory_per_partition_default.h"
#include "cluster/scheduling/types.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "features/feature_table.h"
#include "model/metadata.h"
#include "random/generators.h"
#include "resource_mgmt/memory_groups.h"
#include "ssx/async_algorithm.h"
#include "utils/human.h"

Expand All @@ -45,7 +47,6 @@ namespace cluster {
partition_allocator::partition_allocator(
ss::sharded<members_table>& members,
ss::sharded<features::feature_table>& feature_table,
config::binding<std::optional<size_t>> memory_per_partition,
config::binding<std::optional<int32_t>> fds_per_partition,
config::binding<uint32_t> partitions_per_shard,
config::binding<uint32_t> partitions_reserve_shard0,
Expand All @@ -58,7 +59,6 @@ partition_allocator::partition_allocator(
internal_kafka_topics))
, _members(members)
, _feature_table(feature_table.local())
, _memory_per_partition(std::move(memory_per_partition))
, _fds_per_partition(std::move(fds_per_partition))
, _partitions_per_shard(std::move(partitions_per_shard))
, _partitions_reserve_shard0(std::move(partitions_reserve_shard0))
Expand All @@ -81,6 +81,99 @@ allocation_constraints partition_allocator::default_constraints() {
return req;
}

bool guess_is_memory_group_aware_memory_limit() {
// Here we are trying to guess whether `topic_memory_per_partition` is
// memory group aware.
//
// Originally the property was chosen such that at maxed out
// `topic_partitions_per_shard` we wouldn't run into OOM issues on 4 or 8GiB
// per shard hosts.
//
// These days we have a better understanding of how much memory a partition
// actually uses at rest based on memory profiler and metrics analysis.
// Hence we now set it to a more realistic value in the XXXKiB range.
//
// At the same time we also made the memory_groups reserve a percentage of
// the total memory space for partitions. Instead of dividing the whole
// memory space by `topic_memory_per_partition` we now only do that with the
// reserved value.
//
// Because of the latter making this change isn't trivial. Users might have
// lowered the `topic_memory_per_partition` property to gain higher
// partition density. With only using the reserved memory space when
// checking memory limits this might lead to now actually having less
// partition density. Hence, we need to guess whether the property was set
// before this change or after. If it was set before we just use the old
// rules (while still reserving a part of the memory) when checking memory
// limits to not break existing behaviour.
//
// Note that if we get this guess wrong that isn't immediately fatal. This
// only becomes active when creating new topics or modifying existing
// topics. At that point users can increase the memory reservation if
// needed.

// not overriden so definitely memory group aware
if (!config::shard_local_cfg().topic_memory_per_partition.is_overriden()) {
return true;
}

auto value = config::shard_local_cfg().topic_memory_per_partition.value();

// Previous default was 4MiB. New one is more than 10x smaller. We assume
// it's unlikely someone would have changed the value to be that much
// smaller and hence guess that all the values larger than 2 times the new
// default are old values. Equally in the new world it's unlikely anybody
// would increase the value (at all really).

return value < 2 * ORIGINAL_MEMORY_GROUP_AWARE_TMPP;
}

std::error_code partition_allocator::check_memory_limits(
uint64_t new_partitions_replicas_requested,
uint64_t proposed_total_partitions,
uint64_t effective_cluster_memory) const {
// Refuse to create partitions that would violate the configured
// memory per partition.
auto memory_per_partition_replica
= config::shard_local_cfg().topic_memory_per_partition();

if (!memory_per_partition_replica.has_value()) {
return errc::success;
}

bool is_memory_aware = guess_is_memory_group_aware_memory_limit();

uint64_t partition_memory;
if (is_memory_aware) {
partition_memory = effective_cluster_memory
* memory_groups().partitions_max_memory_share();
} else {
partition_memory = effective_cluster_memory;
}

uint64_t memory_limit = partition_memory
/ memory_per_partition_replica.value();

if (proposed_total_partitions > memory_limit) {
vlog(
clusterlog.warn,
"Refusing to create {} new partitions as total partition count "
"{} "
"would exceed memory limit of {} partitions. Cluster partition "
"memory: {} - "
"required memory per partition: {} - memory group aware: {}",
new_partitions_replicas_requested,
proposed_total_partitions,
memory_limit,
partition_memory,
memory_per_partition_replica.value(),
is_memory_aware);
return errc::topic_invalid_partitions_memory_limit;
}

return errc::success;
}

/**
* Check cluster-wide limits on total partition count vs available
* system resources. This is the 'sanity' check that the user doesn't
Expand Down Expand Up @@ -189,26 +282,13 @@ std::error_code partition_allocator::check_cluster_limits(
return errc::topic_invalid_partitions_core_limit;
}

// Refuse to create partitions that would violate the configured
// memory per partition.
auto memory_per_partition_replica = _memory_per_partition();
if (
memory_per_partition_replica.has_value()
&& memory_per_partition_replica.value() > 0) {
const uint64_t memory_limit = effective_cluster_memory
/ memory_per_partition_replica.value();

if (proposed_total_partitions > memory_limit) {
vlog(
clusterlog.warn,
"Refusing to create {} new partitions as total partition count "
"{} "
"would exceed memory limit {}",
new_partitions_replicas_requested,
proposed_total_partitions,
memory_limit);
return errc::topic_invalid_partitions_memory_limit;
}
auto memory_errc = check_memory_limits(
new_partitions_replicas_requested,
proposed_total_partitions,
effective_cluster_memory);

if (memory_errc != errc::success) {
return memory_errc;
}

// Refuse to create partitions that would exhaust our nfiles ulimit
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/scheduling/partition_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class partition_allocator {
partition_allocator(
ss::sharded<members_table>&,
ss::sharded<features::feature_table>&,
config::binding<std::optional<size_t>> memory_per_partition,
config::binding<std::optional<int32_t>> fds_per_partition,
config::binding<uint32_t> partitions_per_shard,
config::binding<uint32_t> partitions_reserve_shard0,
Expand Down Expand Up @@ -149,6 +148,12 @@ class partition_allocator {
const uint64_t new_partitions_replicas_requested,
const model::topic_namespace& topic) const;

// sub-routine of the above, checks available memory
std::error_code check_memory_limits(
uint64_t new_partitions_replicas_requested,
uint64_t proposed_total_partitions,
uint64_t effective_cluster_memory) const;

ss::future<result<allocation_units::pointer>>
do_allocate(allocation_request);

Expand All @@ -164,7 +169,6 @@ class partition_allocator {
ss::sharded<members_table>& _members;
features::feature_table& _feature_table;

config::binding<std::optional<size_t>> _memory_per_partition;
config::binding<std::optional<int32_t>> _fds_per_partition;
config::binding<uint32_t> _partitions_per_shard;
config::binding<uint32_t> _partitions_reserve_shard0;
Expand Down
32 changes: 32 additions & 0 deletions src/v/cluster/scheduling/topic_memory_per_partition_default.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include "base/units.h"

#include <cstddef>

namespace cluster {

// Default value for `topic_memory_per_partition`. In a constant here such that
// it can easier be referred to.
inline constexpr size_t DEFAULT_TOPIC_MEMORY_PER_PARTITION = 200_KiB;

// DO NOT CHANGE
//
// Original default for `topic_memory_per_partition` when we made it memory
// group aware. This property is used for heuristics to determine if the user
// had overridden the default when it was non-mmemory group aware. Hence, this
// value should NOT be changed and stay the same even if the (above) default is
// changed.
inline constexpr size_t ORIGINAL_MEMORY_GROUP_AWARE_TMPP = 200_KiB;

} // namespace cluster
4 changes: 4 additions & 0 deletions src/v/cluster/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,17 @@ redpanda_cc_btest(
"partition_allocator_tests.cc",
],
deps = [
"//src/v/base",
"//src/v/cluster",
"//src/v/cluster:topic_memory_per_partition_default",
"//src/v/cluster/tests:partition_allocator_fixture",
"//src/v/config",
"//src/v/model",
"//src/v/raft",
"//src/v/raft:fundamental",
"//src/v/random:fast_prng",
"//src/v/random:generators",
"//src/v/resource_mgmt:memory_groups",
"//src/v/test_utils:seastar_boost",
"@abseil-cpp//absl/container:flat_hash_set",
"@boost//:test",
Expand Down
23 changes: 15 additions & 8 deletions src/v/cluster/tests/partition_allocator_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@

#include <seastar/core/chunked_fifo.hh>

#include <cstdint>
#include <limits>

struct partition_allocator_fixture {
static constexpr uint32_t partitions_per_shard = 1000;
static constexpr uint32_t gb_per_core = 5;

partition_allocator_fixture()
: partition_allocator_fixture(std::nullopt, std::nullopt) {}
: partition_allocator_fixture(std::nullopt, std::nullopt, 1000) {}

~partition_allocator_fixture() {
_allocator.stop().get();
Expand All @@ -53,7 +54,7 @@ struct partition_allocator_fixture {
std::move(rack),
model::broker_properties{
.cores = core_count,
.available_memory_gb = 5 * core_count,
.available_memory_gb = gb_per_core * core_count,
.available_disk_gb = 10 * core_count});

auto ec = members.local().apply(
Expand Down Expand Up @@ -142,27 +143,32 @@ struct partition_allocator_fixture {

fast_prng prng;

uint32_t partitions_per_shard;

protected:
explicit partition_allocator_fixture(
std::optional<size_t> memory_per_partition,
std::optional<int32_t> fds_per_partition) {
std::optional<int32_t> fds_per_partition,
uint32_t partitions_per_shard)
: partitions_per_shard(partitions_per_shard) {
members.start().get();
features.start().get();
_allocator
.start_single(
std::ref(members),
std::ref(features),
config::mock_binding<std::optional<size_t>>(memory_per_partition),
config::mock_binding<std::optional<int32_t>>(fds_per_partition),
config::mock_binding<uint32_t>(uint32_t{partitions_per_shard}),
partitions_reserve_shard0.bind(),
kafka_internal_topics.bind(),
config::mock_binding<bool>(true))
.get();
ss::smp::invoke_on_all([] {
ss::smp::invoke_on_all([memory_per_partition] {
config::shard_local_cfg()
.get("partition_autobalancing_mode")
.set_value(model::partition_autobalancing_mode::node_add);
config::shard_local_cfg().topic_memory_per_partition.set_value(
memory_per_partition);
}).get();
}
};
Expand All @@ -172,7 +178,8 @@ struct partition_allocator_memory_limited_fixture
static constexpr size_t memory_per_partition
= std::numeric_limits<size_t>::max();
partition_allocator_memory_limited_fixture()
: partition_allocator_fixture(memory_per_partition, std::nullopt) {}
: partition_allocator_fixture(memory_per_partition, std::nullopt, 10000) {
}
};

struct partition_allocator_fd_limited_fixture
Expand All @@ -181,5 +188,5 @@ struct partition_allocator_fd_limited_fixture
= std::numeric_limits<int32_t>::max();

partition_allocator_fd_limited_fixture()
: partition_allocator_fixture(std::nullopt, fds_per_partition) {}
: partition_allocator_fixture(std::nullopt, fds_per_partition, 10000) {}
};
Loading

0 comments on commit e32dc25

Please sign in to comment.