Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka/server: add metrics and config for consumer lag reporting #24977

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,13 @@ configuration::configuration()
"sense by the shard and/or partition labels.",
{.needs_restart = needs_restart::no},
false)
, enable_consumer_group_lag_metrics(
*this,
"enable_consumer_group_lag_metrics",
"Enable registering metrics for consumer group lag exposed on "
"the `/public_metrics` endpoint.",
{.needs_restart = needs_restart::no},
false)
, group_min_session_timeout_ms(
*this,
"group_min_session_timeout_ms",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ struct configuration final : public config_store {
property<bool> disable_metrics;
property<bool> disable_public_metrics;
property<bool> aggregate_metrics;
property<bool> enable_consumer_group_lag_metrics;
property<std::chrono::milliseconds> group_min_session_timeout_ms;
property<std::chrono::milliseconds> group_max_session_timeout_ms;
property<std::chrono::milliseconds> group_initial_rebalance_delay;
Expand Down
33 changes: 25 additions & 8 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,18 @@ group::group(
, _conf(conf)
, _catchup_lock(std::move(catchup_lock))
, _partition(std::move(partition))
, _probe(_members, _static_members, _offsets)
, _probe(_members, _static_members, _offsets, _lag_metrics)
, _ctxlog(klog, *this)
, _ctx_txlog(cluster::txlog, *this)
, _md_serializer(std::move(serializer))
, _term(term)
, _enable_group_metrics(group_metrics)
, _enable_consumer_lag_metrics(conf.enable_consumer_group_lag_metrics.bind())
, _abort_interval_ms(config::shard_local_cfg()
.abort_timed_out_transactions_interval_ms.value())
, _tx_frontend(tx_frontend)
, _feature_table(feature_table) {
if (_enable_group_metrics) {
_probe.setup_public_metrics(_id);
}
setup_metrics();

start_abort_timer();
}
Expand Down Expand Up @@ -167,12 +166,13 @@ group::group(
, _conf(conf)
, _catchup_lock(std::move(catchup_lock))
, _partition(std::move(partition))
, _probe(_members, _static_members, _offsets)
, _probe(_members, _static_members, _offsets, _lag_metrics)
, _ctxlog(klog, *this)
, _ctx_txlog(cluster::txlog, *this)
, _md_serializer(std::move(serializer))
, _term(term)
, _enable_group_metrics(group_metrics)
, _enable_consumer_lag_metrics(conf.enable_consumer_group_lag_metrics.bind())
, _abort_interval_ms(config::shard_local_cfg()
.abort_timed_out_transactions_interval_ms.value())
, _tx_frontend(tx_frontend)
Expand All @@ -193,9 +193,7 @@ group::group(
// update when restoring from metadata value
update_subscriptions();

if (_enable_group_metrics) {
_probe.setup_public_metrics(_id);
}
setup_metrics();

start_abort_timer();
}
Expand Down Expand Up @@ -3620,4 +3618,23 @@ group::delete_offsets(std::vector<model::topic_partition> offsets) {
return deleted_offsets;
}

void group::setup_metrics() {
if (!_enable_group_metrics) {
return;
}

_enable_consumer_lag_metrics.watch([this]() {
if (_enable_consumer_lag_metrics()) {
_probe.register_consumer_lag_metrics(_id);
} else {
_probe.deregister_consumer_lag_metrics();
}
});

_probe.setup_public_metrics(_id);
if (_enable_consumer_lag_metrics()) {
_probe.register_consumer_lag_metrics(_id);
}
}

} // namespace kafka
4 changes: 4 additions & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {

cluster::tx::errc map_tx_replication_error(std::error_code ec);

void setup_metrics();

kafka::group_id _id;
group_state _state;
std::optional<model::timestamp> _state_timestamp;
Expand All @@ -944,6 +946,7 @@ class group final : public ss::enable_lw_shared_from_this<group> {
model::topic_partition,
std::unique_ptr<offset_metadata_with_probe>>
_offsets;
consumer_lag_metrics _lag_metrics;
group_probe<
model::topic_partition,
std::unique_ptr<offset_metadata_with_probe>>
Expand All @@ -965,6 +968,7 @@ class group final : public ss::enable_lw_shared_from_this<group> {
chunked_hash_map<model::topic_partition, offset_metadata>
_pending_offset_commits;
enable_group_metrics _enable_group_metrics;
config::binding<bool> _enable_consumer_lag_metrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you aren't using or assigning this in this commit so it should be moved to a different commit


ss::gate _gate;
ss::timer<clock_type> _auto_abort_timer;
Expand Down
55 changes: 52 additions & 3 deletions src/v/kafka/server/group_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ class group_offset_probe {
metrics::public_metric_groups _public_metrics;
};

struct consumer_lag_metrics {
size_t sum{};
size_t max{};
};

template<typename KeyType, typename ValType>
class group_probe {
using member_map = absl::node_hash_map<kafka::member_id, member_ptr>;
Expand All @@ -100,10 +105,14 @@ class group_probe {
explicit group_probe(
member_map& members,
static_member_map& static_members,
offsets_map& offsets) noexcept
offsets_map& offsets,
consumer_lag_metrics& lag_metrics) noexcept
: _members(members)
, _static_members(static_members)
, _offsets(offsets) {}
, _offsets(offsets)
, _lag_metrics(lag_metrics)
, _disable_public_metrics(
config::shard_local_cfg().disable_public_metrics()) {}

group_probe(const group_probe&) = delete;
group_probe& operator=(const group_probe&) = delete;
Expand All @@ -114,7 +123,7 @@ class group_probe {
void setup_public_metrics(const kafka::group_id& group_id) {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_public_metrics()) {
if (_disable_public_metrics) {
return;
}

Expand All @@ -137,11 +146,51 @@ class group_probe {
labels)});
}

void register_consumer_lag_metrics(const kafka::group_id& group_id) {
if (_disable_public_metrics) {
return;
}

if (_public_metrics_consumer_lag.has_value()) {
return;
}

namespace sm = ss::metrics;
auto group_label = metrics::make_namespaced_label("group");
std::vector<sm::label_instance> labels{group_label(group_id())};

_public_metrics_consumer_lag.emplace();

_public_metrics_consumer_lag->add_group(
prometheus_sanitize::metrics_name("kafka:consumer:group"),
{sm::make_gauge(
"lag_sum",
[this] { return _lag_metrics.sum; },
sm::description(
"Sum of consumer group lag for all partitions in a group"),
labels),
sm::make_gauge(
"lag_max",
[this] { return _lag_metrics.max; },
sm::description(
"Maximum consumer group lag across all partitions in a group"),
labels)});
}
void deregister_consumer_lag_metrics() {
_public_metrics_consumer_lag = std::nullopt;
}

private:
member_map& _members;
static_member_map& _static_members;
offsets_map& _offsets;
consumer_lag_metrics& _lag_metrics;
metrics::public_metric_groups _public_metrics;
// A different group is used for these as they need to be de-register-able.
// To deregister the metrics the public_metric_groups object needs to be
// destroyed. For this reason, a nullable type is being used here.
std::optional<metrics::public_metric_groups> _public_metrics_consumer_lag;
bool _disable_public_metrics;
};

} // namespace kafka
60 changes: 59 additions & 1 deletion tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from rptest.clients.types import TopicSpec
from rptest.services.kafka_cli_consumer import KafkaCliConsumer
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService, MetricsEndpoint
from rptest.services.rpk_producer import RpkProducer
from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.tests.redpanda_test import RedpandaTest
Expand Down Expand Up @@ -658,6 +658,64 @@ def test_consumer_static_member_update(self):
self.producer.wait()
self.producer.free()

@cluster(num_nodes=6)
@parametrize(enable_consumer_lag_metrics=True)
@parametrize(enable_consumer_lag_metrics=False)
def test_group_lag_metrics_dynamic_registration(
self, enable_consumer_lag_metrics):
"""
Test validating that consumer lag metrics can by dynamically enabled/disabled
"""
self.redpanda.set_cluster_config(
{"enable_consumer_group_lag_metrics": enable_consumer_lag_metrics})

self.create_topic(20)
group = 'test-gr-1'
# use 2 consumers
consumers = self.create_consumers(2,
self.topic_spec.name,
group,
static_members=False)

self.start_producer()
# wait for some messages
wait_until(
lambda: ConsumerGroupTest.group_consumed_at_least(
consumers, 50 * len(consumers)), 30, 2)
Comment on lines +681 to +684
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add an error message in wait_until

self.validate_group_state(group,
expected_state="Stable",
static_members=False)

#node = random.choice(self.redpanda.started_nodes())
def get_consumer_lag_metrics_from_nodes():
patterns = [
"redpanda_kafka_consumer_group_lag_max",
"redpanda_kafka_consumer_group_lag_sum"
]
samples = self.redpanda.metrics_samples(
patterns, self.redpanda.started_nodes(),
MetricsEndpoint.PUBLIC_METRICS)
success = samples is not None and set(
samples.keys()) == set(patterns)
return success, samples

has_consumer_lag_metrics, _ = get_consumer_lag_metrics_from_nodes()
assert has_consumer_lag_metrics == enable_consumer_lag_metrics, f"Expected value '{enable_consumer_lag_metrics}' but got '{has_consumer_lag_metrics}'"

flipped_value = not enable_consumer_lag_metrics
self.redpanda.set_cluster_config(
{"enable_consumer_group_lag_metrics": flipped_value})
has_consumer_lag_metrics, _ = get_consumer_lag_metrics_from_nodes()
assert has_consumer_lag_metrics == flipped_value, f"Expected value '{flipped_value}' but got '{has_consumer_lag_metrics}'"

self.producer.wait()
self.producer.free()

for c in consumers:
c.stop()
c.wait()
c.free()


@dataclass
class OffsetAndMetadata():
Expand Down
Loading