Skip to content

Commit

Permalink
r/consensus: used chunked_vector in raft::replicate
Browse files Browse the repository at this point in the history
When replicated batches in Redpanda we never use the capabilities of
record batch reader. It introduces additional burden as it needs to be
converted to vector of batches for batching and handling. Changed the
interface to accept `chunked_vector<model::record_batch>` or a single
batch instead of using readers.

Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Jan 3, 2025
1 parent 6139187 commit 34ae66c
Show file tree
Hide file tree
Showing 47 changed files with 429 additions and 476 deletions.
4 changes: 1 addition & 3 deletions src/v/cloud_topics/dl_stm/dl_stm_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,9 @@ ss::future<checked<model::offset, dl_stm_api_errc>>
dl_stm_api::replicated_apply(model::record_batch&& batch) {
model::term_id term = _stm->_raft->term();

auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts);
auto res = co_await _stm->_raft->replicate(term, std::move(batch), opts);

if (res.has_error()) {
throw std::runtime_error(
Expand Down
4 changes: 1 addition & 3 deletions src/v/cluster/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,7 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
opts.set_force_flush();

auto result = co_await _raft->replicate(
current_term,
model::make_memory_record_batch_reader(std::move(batch)),
opts);
current_term, std::move(batch), opts);
if (!result) {
vlog(
_logger.warn,
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/archival/tests/archival_service_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ class archiver_cluster_fixture
result<raft::replicate_result> res
= partition->raft()
->replicate(
model::make_memory_record_batch_reader(std::move(batches)),
chunked_vector<model::record_batch>(std::move(batches)),
raft::replicate_options(acks))
.get();

Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/distributed_kv_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,9 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
}

ss::future<errc> replicate_and_wait(simple_batch_builder builder) {
auto batch = std::move(builder).build();
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto r = co_await _raft->replicate(
_insync_term,
std::move(reader),
std::move(builder).build(),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!r) {
Expand Down
3 changes: 1 addition & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,9 @@ ss::future<bool> id_allocator_stm::set_state(
int64_t value, model::timeout_clock::duration timeout) {
auto batch = serialize_cmd(
state_cmd{.next_state = value}, model::record_batch_type::id_allocator);
auto reader = model::make_memory_record_batch_reader(std::move(batch));
auto r = co_await _raft->replicate(
_insync_term,
std::move(reader),
std::move(batch),
raft::replicate_options(raft::consistency_level::quorum_ack));
if (!r) {
co_return false;
Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,7 @@ ss::future<log_eviction_stm::offset_result> log_eviction_stm::replicate_command(
std::optional<std::reference_wrapper<ss::abort_source>> as) {
auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto fut = _raft->replicate(
_raft->term(),
model::make_memory_record_batch_reader(std::move(batch)),
opts);
auto fut = _raft->replicate(_raft->term(), std::move(batch), opts);

/// Execute the replicate command bound by timeout and cancellable via
/// abort_source mechanism
Expand Down
17 changes: 8 additions & 9 deletions src/v/cluster/migrations/tx_manager_migrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ ss::future<tx_manager_read_reply> tx_manager_read_handler::do_read(
reader_cfg.max_bytes = 128_KiB;
auto reader = co_await partition->make_reader(std::move(reader_cfg));

auto batches = co_await model::consume_reader_to_fragmented_memory(
auto batches = co_await model::consume_reader_to_chunked_vector(
std::move(reader), model::no_timeout);

co_return tx_manager_read_reply(
Expand All @@ -233,7 +233,7 @@ ss::future<tx_manager_replicate_reply>
tx_manager_replicate_handler::do_replicate(
partition_manager& pm,
model::ntp ntp,
fragmented_vector<model::record_batch> batches) {
chunked_vector<model::record_batch> batches) {
vlog(
logger.info,
"Requested replication of {} batches to {}",
Expand All @@ -245,7 +245,7 @@ tx_manager_replicate_handler::do_replicate(
}

auto r = co_await partition->replicate(
model::make_fragmented_memory_record_batch_reader(std::move(batches)),
std::move(batches),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (r.has_error()) {
Expand Down Expand Up @@ -549,17 +549,16 @@ tx_manager_migrator::rehash_and_write_partition_data(
std::move(source_ntp),
default_timeout,
_as,
[this,
source_partition_id](fragmented_vector<model::record_batch> batches) {
[this, source_partition_id](chunked_vector<model::record_batch> batches) {
return rehash_chunk(source_partition_id, std::move(batches));
});
}

ss::future<std::error_code> tx_manager_migrator::rehash_chunk(
model::partition_id source_partition_id,
fragmented_vector<model::record_batch> batches) {
chunked_vector<model::record_batch> batches) {
absl::
flat_hash_map<model::partition_id, fragmented_vector<model::record_batch>>
flat_hash_map<model::partition_id, chunked_vector<model::record_batch>>
target_partition_batches;
target_partition_batches.reserve(_requested_partition_count);
for (auto& b : batches) {
Expand Down Expand Up @@ -632,8 +631,8 @@ tx_manager_migrator::copy_from_temporary_to_tx_manager_topic(
default_timeout,
_as,
[this, target_ntp = std::move(target_ntp)](
fragmented_vector<model::record_batch> all_batches) {
fragmented_vector<model::record_batch> tx_batches;
chunked_vector<model::record_batch> all_batches) {
chunked_vector<model::record_batch> tx_batches;
for (auto& b : all_batches) {
if (b.header().type == model::record_batch_type::tm_update) {
tx_batches.push_back(std::move(b));
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/migrations/tx_manager_migrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class tx_manager_replicate_handler {

private:
ss::future<tx_manager_replicate_reply> do_replicate(
partition_manager&, model::ntp, fragmented_vector<model::record_batch>);
partition_manager&, model::ntp, chunked_vector<model::record_batch>);

ss::sharded<partition_manager>& _partition_manager;
};
Expand Down Expand Up @@ -198,7 +198,7 @@ class tx_manager_migrator {
rehash_and_write_partition_data(model::partition_id source_partition_id);
ss::future<std::error_code> rehash_chunk(
model::partition_id source_partition_id,
fragmented_vector<model::record_batch> batches);
chunked_vector<model::record_batch> batches);

ss::future<std::error_code>
copy_from_temporary_to_tx_manager_topic(model::partition_id);
Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/migrations/tx_manager_migrator_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct tx_manager_read_reply
: ec(ec) {}

tx_manager_read_reply(
fragmented_vector<model::record_batch> batches,
chunked_vector<model::record_batch> batches,
model::offset log_dirty_offset)
: ec(errc::success)
, batches(std::move(batches))
Expand All @@ -75,7 +75,7 @@ struct tx_manager_read_reply
auto serde_fields() { return std::tie(ec, batches, log_dirty_offset); }

errc ec;
fragmented_vector<model::record_batch> batches;
chunked_vector<model::record_batch> batches;
model::offset log_dirty_offset;

friend bool operator==(
Expand All @@ -96,7 +96,7 @@ struct tx_manager_replicate_request
tx_manager_replicate_request() = default;

tx_manager_replicate_request(
model::ntp ntp, fragmented_vector<model::record_batch> batches)
model::ntp ntp, chunked_vector<model::record_batch> batches)
: ntp(std::move(ntp))
, batches(std::move(batches)) {}

Expand All @@ -121,7 +121,7 @@ struct tx_manager_replicate_request
auto serde_fields() { return std::tie(ntp, batches); }

model::ntp ntp;
fragmented_vector<model::record_batch> batches;
chunked_vector<model::record_batch> batches;

friend bool operator==(
const tx_manager_replicate_request& lhs,
Expand Down
15 changes: 8 additions & 7 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,15 +338,15 @@ ss::future<storage::translating_reader> partition::make_cloud_reader(
}

ss::future<result<kafka_result>> partition::replicate(
model::record_batch_reader&& r, raft::replicate_options opts) {
chunked_vector<model::record_batch> batches, raft::replicate_options opts) {
using ret_t = result<kafka_result>;
auto reader = std::move(r);

auto maybe_units = co_await hold_writes_enabled();
if (!maybe_units) {
co_return ret_t(maybe_units.error());
}
auto res = co_await _raft->replicate(std::move(reader), opts);

auto res = co_await _raft->replicate(std::move(batches), opts);
if (!res) {
co_return ret_t(res.error());
}
Expand Down Expand Up @@ -402,7 +402,7 @@ kafka_stages stages_with_units(

kafka_stages partition::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader&& r,
chunked_vector<model::record_batch> batches,
raft::replicate_options opts) {
using ret_t = result<kafka_result>;

Expand Down Expand Up @@ -430,12 +430,13 @@ kafka_stages partition::replicate_in_stages(
hold_writes_enabled(),
[this,
bid = std::move(bid),
r = std::move(r),
batches = std::move(batches),
opts = std::move(opts)]() mutable {
if (_rm_stm) {
return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
return _rm_stm->replicate_in_stages(
bid, std::move(batches), opts);
}
auto res = _raft->replicate_in_stages(std::move(r), opts);
auto res = _raft->replicate_in_stages(std::move(batches), opts);
auto replicate_finished = res.replicate_finished.then(
[this](result<raft::replicate_result> r) {
if (!r) {
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
/// after a configuration change.
void maybe_construct_archiver();

ss::future<result<kafka_result>>
replicate(model::record_batch_reader&&, raft::replicate_options);
ss::future<result<kafka_result>> replicate(
chunked_vector<model::record_batch> batches, raft::replicate_options);

/// Truncate the beginning of the log up until a given offset
/// Can only be performed on logs that are deletable and non internal
Expand All @@ -90,7 +90,7 @@ class partition : public ss::enable_lw_shared_from_this<partition> {

kafka_stages replicate_in_stages(
model::batch_identity,
model::record_batch_reader&&,
chunked_vector<model::record_batch> batches,
raft::replicate_options);

/**
Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/partition_properties_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,7 @@ partition_properties_stm::replicate_properties_update(
raft::consistency_level::quorum_ack,
std::chrono::milliseconds(timeout / 1ms));
r_opts.set_force_flush();
auto r = co_await _raft->replicate(
_insync_term,
model::make_memory_record_batch_reader(std::move(b)),
r_opts);
auto r = co_await _raft->replicate(_insync_term, std::move(b), r_opts);

if (r.has_error()) {
vlog(
Expand Down
Loading

0 comments on commit 34ae66c

Please sign in to comment.