Skip to content

Commit

Permalink
Merge pull request #24477 from mmaslankaprv/raft-chunked-vector
Browse files Browse the repository at this point in the history
change the `consensus::replicate` method to accept `chunked_vector`
  • Loading branch information
mmaslankaprv authored Jan 7, 2025
2 parents 1cf116c + 34ae66c commit 9e7fe8c
Show file tree
Hide file tree
Showing 49 changed files with 463 additions and 476 deletions.
4 changes: 1 addition & 3 deletions src/v/cloud_topics/dl_stm/dl_stm_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,9 @@ ss::future<checked<model::offset, dl_stm_api_errc>>
dl_stm_api::replicated_apply(model::record_batch&& batch) {
model::term_id term = _stm->_raft->term();

auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts);
auto res = co_await _stm->_raft->replicate(term, std::move(batch), opts);

if (res.has_error()) {
throw std::runtime_error(
Expand Down
4 changes: 1 addition & 3 deletions src/v/cluster/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,7 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
opts.set_force_flush();

auto result = co_await _raft->replicate(
current_term,
model::make_memory_record_batch_reader(std::move(batch)),
opts);
current_term, std::move(batch), opts);
if (!result) {
vlog(
_logger.warn,
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/archival/tests/archival_service_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ class archiver_cluster_fixture
result<raft::replicate_result> res
= partition->raft()
->replicate(
model::make_memory_record_batch_reader(std::move(batches)),
chunked_vector<model::record_batch>(std::move(batches)),
raft::replicate_options(acks))
.get();

Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/distributed_kv_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,9 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
}

ss::future<errc> replicate_and_wait(simple_batch_builder builder) {
auto batch = std::move(builder).build();
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto r = co_await _raft->replicate(
_insync_term,
std::move(reader),
std::move(builder).build(),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!r) {
Expand Down
3 changes: 1 addition & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,9 @@ ss::future<bool> id_allocator_stm::set_state(
int64_t value, model::timeout_clock::duration timeout) {
auto batch = serialize_cmd(
state_cmd{.next_state = value}, model::record_batch_type::id_allocator);
auto reader = model::make_memory_record_batch_reader(std::move(batch));
auto r = co_await _raft->replicate(
_insync_term,
std::move(reader),
std::move(batch),
raft::replicate_options(raft::consistency_level::quorum_ack));
if (!r) {
co_return false;
Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,7 @@ ss::future<log_eviction_stm::offset_result> log_eviction_stm::replicate_command(
std::optional<std::reference_wrapper<ss::abort_source>> as) {
auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto fut = _raft->replicate(
_raft->term(),
model::make_memory_record_batch_reader(std::move(batch)),
opts);
auto fut = _raft->replicate(_raft->term(), std::move(batch), opts);

/// Execute the replicate command bound by timeout and cancellable via
/// abort_source mechanism
Expand Down
17 changes: 8 additions & 9 deletions src/v/cluster/migrations/tx_manager_migrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ ss::future<tx_manager_read_reply> tx_manager_read_handler::do_read(
reader_cfg.max_bytes = 128_KiB;
auto reader = co_await partition->make_reader(std::move(reader_cfg));

auto batches = co_await model::consume_reader_to_fragmented_memory(
auto batches = co_await model::consume_reader_to_chunked_vector(
std::move(reader), model::no_timeout);

co_return tx_manager_read_reply(
Expand All @@ -233,7 +233,7 @@ ss::future<tx_manager_replicate_reply>
tx_manager_replicate_handler::do_replicate(
partition_manager& pm,
model::ntp ntp,
fragmented_vector<model::record_batch> batches) {
chunked_vector<model::record_batch> batches) {
vlog(
logger.info,
"Requested replication of {} batches to {}",
Expand All @@ -245,7 +245,7 @@ tx_manager_replicate_handler::do_replicate(
}

auto r = co_await partition->replicate(
model::make_fragmented_memory_record_batch_reader(std::move(batches)),
std::move(batches),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (r.has_error()) {
Expand Down Expand Up @@ -549,17 +549,16 @@ tx_manager_migrator::rehash_and_write_partition_data(
std::move(source_ntp),
default_timeout,
_as,
[this,
source_partition_id](fragmented_vector<model::record_batch> batches) {
[this, source_partition_id](chunked_vector<model::record_batch> batches) {
return rehash_chunk(source_partition_id, std::move(batches));
});
}

ss::future<std::error_code> tx_manager_migrator::rehash_chunk(
model::partition_id source_partition_id,
fragmented_vector<model::record_batch> batches) {
chunked_vector<model::record_batch> batches) {
absl::
flat_hash_map<model::partition_id, fragmented_vector<model::record_batch>>
flat_hash_map<model::partition_id, chunked_vector<model::record_batch>>
target_partition_batches;
target_partition_batches.reserve(_requested_partition_count);
for (auto& b : batches) {
Expand Down Expand Up @@ -632,8 +631,8 @@ tx_manager_migrator::copy_from_temporary_to_tx_manager_topic(
default_timeout,
_as,
[this, target_ntp = std::move(target_ntp)](
fragmented_vector<model::record_batch> all_batches) {
fragmented_vector<model::record_batch> tx_batches;
chunked_vector<model::record_batch> all_batches) {
chunked_vector<model::record_batch> tx_batches;
for (auto& b : all_batches) {
if (b.header().type == model::record_batch_type::tm_update) {
tx_batches.push_back(std::move(b));
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/migrations/tx_manager_migrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class tx_manager_replicate_handler {

private:
ss::future<tx_manager_replicate_reply> do_replicate(
partition_manager&, model::ntp, fragmented_vector<model::record_batch>);
partition_manager&, model::ntp, chunked_vector<model::record_batch>);

ss::sharded<partition_manager>& _partition_manager;
};
Expand Down Expand Up @@ -198,7 +198,7 @@ class tx_manager_migrator {
rehash_and_write_partition_data(model::partition_id source_partition_id);
ss::future<std::error_code> rehash_chunk(
model::partition_id source_partition_id,
fragmented_vector<model::record_batch> batches);
chunked_vector<model::record_batch> batches);

ss::future<std::error_code>
copy_from_temporary_to_tx_manager_topic(model::partition_id);
Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/migrations/tx_manager_migrator_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct tx_manager_read_reply
: ec(ec) {}

tx_manager_read_reply(
fragmented_vector<model::record_batch> batches,
chunked_vector<model::record_batch> batches,
model::offset log_dirty_offset)
: ec(errc::success)
, batches(std::move(batches))
Expand All @@ -75,7 +75,7 @@ struct tx_manager_read_reply
auto serde_fields() { return std::tie(ec, batches, log_dirty_offset); }

errc ec;
fragmented_vector<model::record_batch> batches;
chunked_vector<model::record_batch> batches;
model::offset log_dirty_offset;

friend bool operator==(
Expand All @@ -96,7 +96,7 @@ struct tx_manager_replicate_request
tx_manager_replicate_request() = default;

tx_manager_replicate_request(
model::ntp ntp, fragmented_vector<model::record_batch> batches)
model::ntp ntp, chunked_vector<model::record_batch> batches)
: ntp(std::move(ntp))
, batches(std::move(batches)) {}

Expand All @@ -121,7 +121,7 @@ struct tx_manager_replicate_request
auto serde_fields() { return std::tie(ntp, batches); }

model::ntp ntp;
fragmented_vector<model::record_batch> batches;
chunked_vector<model::record_batch> batches;

friend bool operator==(
const tx_manager_replicate_request& lhs,
Expand Down
15 changes: 8 additions & 7 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,15 +338,15 @@ ss::future<storage::translating_reader> partition::make_cloud_reader(
}

ss::future<result<kafka_result>> partition::replicate(
model::record_batch_reader&& r, raft::replicate_options opts) {
chunked_vector<model::record_batch> batches, raft::replicate_options opts) {
using ret_t = result<kafka_result>;
auto reader = std::move(r);

auto maybe_units = co_await hold_writes_enabled();
if (!maybe_units) {
co_return ret_t(maybe_units.error());
}
auto res = co_await _raft->replicate(std::move(reader), opts);

auto res = co_await _raft->replicate(std::move(batches), opts);
if (!res) {
co_return ret_t(res.error());
}
Expand Down Expand Up @@ -402,7 +402,7 @@ kafka_stages stages_with_units(

kafka_stages partition::replicate_in_stages(
model::batch_identity bid,
model::record_batch_reader&& r,
chunked_vector<model::record_batch> batches,
raft::replicate_options opts) {
using ret_t = result<kafka_result>;

Expand Down Expand Up @@ -430,12 +430,13 @@ kafka_stages partition::replicate_in_stages(
hold_writes_enabled(),
[this,
bid = std::move(bid),
r = std::move(r),
batches = std::move(batches),
opts = std::move(opts)]() mutable {
if (_rm_stm) {
return _rm_stm->replicate_in_stages(bid, std::move(r), opts);
return _rm_stm->replicate_in_stages(
bid, std::move(batches), opts);
}
auto res = _raft->replicate_in_stages(std::move(r), opts);
auto res = _raft->replicate_in_stages(std::move(batches), opts);
auto replicate_finished = res.replicate_finished.then(
[this](result<raft::replicate_result> r) {
if (!r) {
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
/// after a configuration change.
void maybe_construct_archiver();

ss::future<result<kafka_result>>
replicate(model::record_batch_reader&&, raft::replicate_options);
ss::future<result<kafka_result>> replicate(
chunked_vector<model::record_batch> batches, raft::replicate_options);

/// Truncate the beginning of the log up until a given offset
/// Can only be performed on logs that are deletable and non internal
Expand All @@ -90,7 +90,7 @@ class partition : public ss::enable_lw_shared_from_this<partition> {

kafka_stages replicate_in_stages(
model::batch_identity,
model::record_batch_reader&&,
chunked_vector<model::record_batch> batches,
raft::replicate_options);

/**
Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/partition_properties_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,7 @@ partition_properties_stm::replicate_properties_update(
raft::consistency_level::quorum_ack,
std::chrono::milliseconds(timeout / 1ms));
r_opts.set_force_flush();
auto r = co_await _raft->replicate(
_insync_term,
model::make_memory_record_batch_reader(std::move(b)),
r_opts);
auto r = co_await _raft->replicate(_insync_term, std::move(b), r_opts);

if (r.has_error()) {
vlog(
Expand Down
Loading

0 comments on commit 9e7fe8c

Please sign in to comment.