Skip to content

Commit

Permalink
Merge pull request #24864 from mmaslankaprv/append-entries-buffer-obs…
Browse files Browse the repository at this point in the history
…ervability

Small improvements in Raft
  • Loading branch information
mmaslankaprv authored Jan 21, 2025
2 parents 3e2d782 + 8f6c1a7 commit 591a569
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 25 deletions.
1 change: 1 addition & 0 deletions src/v/raft/append_entries_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ ss::future<> append_entries_buffer::do_flush(
bool needs_flush = false;
reply_list_t replies;
auto f = ss::now();
_consensus._probe->append_entries_buffer_flush();
{
ssx::semaphore_units op_lock_units = std::move(u);
replies.reserve(requests.size());
Expand Down
5 changes: 2 additions & 3 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1937,6 +1937,7 @@ ss::future<vote_reply> consensus::do_vote(vote_request r) {
ss::future<append_entries_reply>
consensus::append_entries(append_entries_request&& r) {
return with_gate(_bg, [this, r = std::move(r)]() mutable {
_probe->append_request();
return _append_requests_buffer.enqueue(std::move(r));
});
}
Expand All @@ -1956,8 +1957,6 @@ consensus::do_append_entries(append_entries_request&& r) {
reply.may_recover = _follower_recovery_state
&& _follower_recovery_state->is_active();

_probe->append_request();

if (unlikely(is_request_target_node_invalid("append_entries", r))) {
co_return reply;
}
Expand Down Expand Up @@ -3251,7 +3250,7 @@ std::ostream& operator<<(std::ostream& o, const consensus& c) {
return o;
}

group_configuration consensus::config() const {
const group_configuration& consensus::config() const {
return _configuration_manager.get_latest();
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class consensus {
protocol_metadata meta() const;
raft::group_id group() const { return _group; }
model::term_id term() const { return _term; }
group_configuration config() const;
const group_configuration& config() const;
const model::ntp& ntp() const { return _log->config().ntp(); }
clock_type::time_point last_heartbeat() const { return _hbeat; };
clock_type::time_point became_leader_at() const {
Expand Down
5 changes: 5 additions & 0 deletions src/v/raft/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ void probe::setup_metrics(const model::ntp& ntp) {
sm::description("Number of append entries requests that failed the "
"offset translator consistency check"),
labels),
sm::make_counter(
"append_entries_buffer_flushes",
[this] { return _append_entries_buffer_flush; },
sm::description("Number of append entries buffer flushes"),
labels),
},
{},
{sm::shard_label, sm::label("partition")});
Expand Down
3 changes: 2 additions & 1 deletion src/v/raft/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class probe {
void offset_translator_inconsistency_error() {
++_offset_translator_inconsistency_error;
}

void append_entries_buffer_flush() { ++_append_entries_buffer_flush; }
void clear() {
_metrics.clear();
_public_metrics.clear();
Expand All @@ -93,6 +93,7 @@ class probe {
uint64_t _full_heartbeat_requests = 0;
uint64_t _lw_heartbeat_requests = 0;
uint64_t _offset_translator_inconsistency_error = 0;
uint64_t _append_entries_buffer_flush = 0;
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};
Expand Down
52 changes: 33 additions & 19 deletions src/v/raft/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "raft/consensus.h"
#include "raft/raftgen_service.h"
#include "raft/types.h"
#include "ssx/async_algorithm.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -137,16 +138,24 @@ class service final : public raftgen_service {
= co_await ss::when_all_succeed(futures.begin(), futures.end());

heartbeat_reply_v2 reply(_self, source);

ssx::async_counter cnt;
// flatten responses
for (shard_heartbeat_replies& shard_replies : replies) {
for (const auto& lw_reply : shard_replies.lw_replies) {
reply.add(lw_reply.group, lw_reply.result);
}
for (const auto& full_reply : shard_replies.full_heartbeats) {
reply.add(full_reply.group, full_reply.result, full_reply.data);
}
co_await ss::coroutine::maybe_yield();
co_await ssx::async_for_each_counter(
cnt,
shard_replies.lw_replies.begin(),
shard_replies.lw_replies.end(),
[&reply](lw_reply& lw_reply) {
reply.add(lw_reply.group, lw_reply.result);
});
co_await ssx::async_for_each_counter(
cnt,
shard_replies.full_heartbeats.begin(),
shard_replies.full_heartbeats.end(),
[&reply](full_heartbeat_reply& full_reply) {
reply.add(
full_reply.group, full_reply.result, full_reply.data);
});
}

for (auto& m : group_missing_replies) {
Expand All @@ -169,6 +178,7 @@ class service final : public raftgen_service {
}

[[gnu::always_inline]] ss::future<append_entries_reply>

append_entries(append_entries_request r, rpc::streaming_context&) final {
return _probe.append_entries().then([this, r = std::move(r)]() mutable {
auto gr = r.target_group();
Expand Down Expand Up @@ -424,20 +434,24 @@ class service final : public raftgen_service {
model::node_id target_node,
shard_heartbeats reqs) {
shard_heartbeat_replies replies;
replies.lw_replies.reserve(reqs.lw_heartbeats.size());
replies.full_heartbeats.reserve(reqs.full_heartbeats.size());
/**
* Dispatch lightweight heartbeats
*/
for (const auto& gr : reqs.lw_heartbeats) {
auto c = m.consensus_for(gr);
if (unlikely(!c)) {
replies.lw_replies.emplace_back(
gr, reply_result::group_unavailable);
continue;
}
auto result = c->lightweight_heartbeat(source_node, target_node);
replies.lw_replies.emplace_back(gr, result);
co_await ss::coroutine::maybe_yield();
}
co_await ssx::async_for_each(
reqs.lw_heartbeats.begin(),
reqs.lw_heartbeats.end(),
[&m, &replies, source_node, target_node](group_id gr) {
auto c = m.consensus_for(gr);
if (unlikely(!c)) {
replies.lw_replies.emplace_back(
gr, reply_result::group_unavailable);
return;
}
auto result = c->lightweight_heartbeat(source_node, target_node);
replies.lw_replies.emplace_back(gr, result);
});

std::vector<ss::future<full_heartbeat_reply>> futures;
const auto timeout = clock_type::now() + _heartbeat_interval;
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ redpanda_cc_gtest(
"raft_reconfiguration_test.cc",
],
cpu = 4,
memory = "4GiB",
memory = "6GiB",
tags = ["exclusive"],
deps = [
"//src/v/base",
Expand Down

0 comments on commit 591a569

Please sign in to comment.