From 34ae66c04159bc64ebeb753a702c5634146873a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 23 Dec 2024 17:59:18 +0100 Subject: [PATCH] r/consensus: used chunked_vector in `raft::replicate` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When replicated batches in Redpanda we never use the capabilities of record batch reader. It introduces additional burden as it needs to be converted to vector of batches for batching and handling. Changed the interface to accept `chunked_vector` or a single batch instead of using readers. Signed-off-by: Michał Maślanka --- src/v/cloud_topics/dl_stm/dl_stm_api.cc | 4 +- .../cluster/archival/archival_metadata_stm.cc | 4 +- .../archival/tests/archival_service_fixture.h | 2 +- src/v/cluster/distributed_kv_stm.h | 5 +- src/v/cluster/id_allocator_stm.cc | 3 +- src/v/cluster/log_eviction_stm.cc | 5 +- .../cluster/migrations/tx_manager_migrator.cc | 17 +- .../cluster/migrations/tx_manager_migrator.h | 4 +- .../migrations/tx_manager_migrator_types.h | 8 +- src/v/cluster/partition.cc | 15 +- src/v/cluster/partition.h | 6 +- src/v/cluster/partition_properties_stm.cc | 5 +- src/v/cluster/rm_stm.cc | 98 ++++---- src/v/cluster/rm_stm.h | 20 +- src/v/cluster/tests/idempotency_tests.cc | 226 +++++++++--------- .../tests/partition_properties_stm_test.cc | 3 +- .../cluster/tests/rebalancing_tests_fixture.h | 4 +- src/v/cluster/tests/rm_stm_tests.cc | 57 ++--- src/v/cluster/tests/tx_compaction_utils.h | 6 +- src/v/cluster/tm_stm.cc | 7 +- src/v/datalake/coordinator/state_machine.cc | 3 +- src/v/datalake/translation/state_machine.cc | 5 +- src/v/kafka/data/partition_proxy.h | 24 +- src/v/kafka/data/replicated_partition.cc | 19 +- src/v/kafka/data/replicated_partition.h | 6 +- src/v/kafka/server/group.cc | 29 +-- src/v/kafka/server/group_manager.cc | 6 +- src/v/kafka/server/handlers/produce.cc | 30 +-- src/v/kafka/server/tests/fetch_test.cc | 34 +-- .../server/tests/replicated_partition_test.cc | 4 +- .../kafka/server/tests/topic_recreate_test.cc | 5 +- .../pandaproxy/schema_registry/validation.cc | 38 +-- src/v/pandaproxy/schema_registry/validation.h | 10 +- src/v/raft/consensus.cc | 35 ++- src/v/raft/consensus.h | 17 +- src/v/raft/mux_state_machine.h | 4 +- src/v/raft/replicate_batcher.cc | 13 +- src/v/raft/replicate_batcher.h | 6 +- src/v/raft/state_machine.cc | 4 +- src/v/raft/tests/append_entries_test.cc | 17 +- src/v/raft/tests/basic_raft_fixture_test.cc | 10 +- src/v/raft/tests/persisted_stm_test.cc | 6 +- src/v/raft/tests/raft_fixture.h | 14 +- src/v/raft/tests/raft_group_fixture.h | 46 ++-- src/v/raft/tests/stm_test_fixture.h | 2 +- src/v/transform/rpc/service.cc | 8 +- .../transform/rpc/tests/transform_rpc_test.cc | 11 +- 47 files changed, 429 insertions(+), 476 deletions(-) diff --git a/src/v/cloud_topics/dl_stm/dl_stm_api.cc b/src/v/cloud_topics/dl_stm/dl_stm_api.cc index 3cd1fcb8f25e6..cab9aa18abac5 100644 --- a/src/v/cloud_topics/dl_stm/dl_stm_api.cc +++ b/src/v/cloud_topics/dl_stm/dl_stm_api.cc @@ -127,11 +127,9 @@ ss::future> dl_stm_api::replicated_apply(model::record_batch&& batch) { model::term_id term = _stm->_raft->term(); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto opts = raft::replicate_options(raft::consistency_level::quorum_ack); opts.set_force_flush(); - auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts); + auto res = co_await _stm->_raft->replicate(term, std::move(batch), opts); if (res.has_error()) { throw std::runtime_error( diff --git a/src/v/cluster/archival/archival_metadata_stm.cc b/src/v/cluster/archival/archival_metadata_stm.cc index 4ea8480291893..0e33c54787f0c 100644 --- a/src/v/cluster/archival/archival_metadata_stm.cc +++ b/src/v/cluster/archival/archival_metadata_stm.cc @@ -895,9 +895,7 @@ ss::future archival_metadata_stm::do_replicate_commands( opts.set_force_flush(); auto result = co_await _raft->replicate( - current_term, - model::make_memory_record_batch_reader(std::move(batch)), - opts); + current_term, std::move(batch), opts); if (!result) { vlog( _logger.warn, diff --git a/src/v/cluster/archival/tests/archival_service_fixture.h b/src/v/cluster/archival/tests/archival_service_fixture.h index 3acc7e8e05915..f1a23235bb74a 100644 --- a/src/v/cluster/archival/tests/archival_service_fixture.h +++ b/src/v/cluster/archival/tests/archival_service_fixture.h @@ -568,7 +568,7 @@ class archiver_cluster_fixture result res = partition->raft() ->replicate( - model::make_memory_record_batch_reader(std::move(batches)), + chunked_vector(std::move(batches)), raft::replicate_options(acks)) .get(); diff --git a/src/v/cluster/distributed_kv_stm.h b/src/v/cluster/distributed_kv_stm.h index 10b754cb6e995..bd10734c1a178 100644 --- a/src/v/cluster/distributed_kv_stm.h +++ b/src/v/cluster/distributed_kv_stm.h @@ -399,12 +399,9 @@ class distributed_kv_stm final : public raft::persisted_stm<> { } ss::future replicate_and_wait(simple_batch_builder builder) { - auto batch = std::move(builder).build(); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto r = co_await _raft->replicate( _insync_term, - std::move(reader), + std::move(builder).build(), raft::replicate_options(raft::consistency_level::quorum_ack)); if (!r) { diff --git a/src/v/cluster/id_allocator_stm.cc b/src/v/cluster/id_allocator_stm.cc index c29eba5d59d1e..5db20656aae77 100644 --- a/src/v/cluster/id_allocator_stm.cc +++ b/src/v/cluster/id_allocator_stm.cc @@ -104,10 +104,9 @@ ss::future id_allocator_stm::set_state( int64_t value, model::timeout_clock::duration timeout) { auto batch = serialize_cmd( state_cmd{.next_state = value}, model::record_batch_type::id_allocator); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto r = co_await _raft->replicate( _insync_term, - std::move(reader), + std::move(batch), raft::replicate_options(raft::consistency_level::quorum_ack)); if (!r) { co_return false; diff --git a/src/v/cluster/log_eviction_stm.cc b/src/v/cluster/log_eviction_stm.cc index 751fc97114727..f223b283a6906 100644 --- a/src/v/cluster/log_eviction_stm.cc +++ b/src/v/cluster/log_eviction_stm.cc @@ -330,10 +330,7 @@ ss::future log_eviction_stm::replicate_command( std::optional> as) { auto opts = raft::replicate_options(raft::consistency_level::quorum_ack); opts.set_force_flush(); - auto fut = _raft->replicate( - _raft->term(), - model::make_memory_record_batch_reader(std::move(batch)), - opts); + auto fut = _raft->replicate(_raft->term(), std::move(batch), opts); /// Execute the replicate command bound by timeout and cancellable via /// abort_source mechanism diff --git a/src/v/cluster/migrations/tx_manager_migrator.cc b/src/v/cluster/migrations/tx_manager_migrator.cc index c3ef4594cf99c..4a4bd87704312 100644 --- a/src/v/cluster/migrations/tx_manager_migrator.cc +++ b/src/v/cluster/migrations/tx_manager_migrator.cc @@ -222,7 +222,7 @@ ss::future tx_manager_read_handler::do_read( reader_cfg.max_bytes = 128_KiB; auto reader = co_await partition->make_reader(std::move(reader_cfg)); - auto batches = co_await model::consume_reader_to_fragmented_memory( + auto batches = co_await model::consume_reader_to_chunked_vector( std::move(reader), model::no_timeout); co_return tx_manager_read_reply( @@ -233,7 +233,7 @@ ss::future tx_manager_replicate_handler::do_replicate( partition_manager& pm, model::ntp ntp, - fragmented_vector batches) { + chunked_vector batches) { vlog( logger.info, "Requested replication of {} batches to {}", @@ -245,7 +245,7 @@ tx_manager_replicate_handler::do_replicate( } auto r = co_await partition->replicate( - model::make_fragmented_memory_record_batch_reader(std::move(batches)), + std::move(batches), raft::replicate_options(raft::consistency_level::quorum_ack)); if (r.has_error()) { @@ -549,17 +549,16 @@ tx_manager_migrator::rehash_and_write_partition_data( std::move(source_ntp), default_timeout, _as, - [this, - source_partition_id](fragmented_vector batches) { + [this, source_partition_id](chunked_vector batches) { return rehash_chunk(source_partition_id, std::move(batches)); }); } ss::future tx_manager_migrator::rehash_chunk( model::partition_id source_partition_id, - fragmented_vector batches) { + chunked_vector batches) { absl:: - flat_hash_map> + flat_hash_map> target_partition_batches; target_partition_batches.reserve(_requested_partition_count); for (auto& b : batches) { @@ -632,8 +631,8 @@ tx_manager_migrator::copy_from_temporary_to_tx_manager_topic( default_timeout, _as, [this, target_ntp = std::move(target_ntp)]( - fragmented_vector all_batches) { - fragmented_vector tx_batches; + chunked_vector all_batches) { + chunked_vector tx_batches; for (auto& b : all_batches) { if (b.header().type == model::record_batch_type::tm_update) { tx_batches.push_back(std::move(b)); diff --git a/src/v/cluster/migrations/tx_manager_migrator.h b/src/v/cluster/migrations/tx_manager_migrator.h index e417534fd23cd..9f7e4dee5fefa 100644 --- a/src/v/cluster/migrations/tx_manager_migrator.h +++ b/src/v/cluster/migrations/tx_manager_migrator.h @@ -112,7 +112,7 @@ class tx_manager_replicate_handler { private: ss::future do_replicate( - partition_manager&, model::ntp, fragmented_vector); + partition_manager&, model::ntp, chunked_vector); ss::sharded& _partition_manager; }; @@ -198,7 +198,7 @@ class tx_manager_migrator { rehash_and_write_partition_data(model::partition_id source_partition_id); ss::future rehash_chunk( model::partition_id source_partition_id, - fragmented_vector batches); + chunked_vector batches); ss::future copy_from_temporary_to_tx_manager_topic(model::partition_id); diff --git a/src/v/cluster/migrations/tx_manager_migrator_types.h b/src/v/cluster/migrations/tx_manager_migrator_types.h index cbb2b5f3e8617..16e36dd14fdc4 100644 --- a/src/v/cluster/migrations/tx_manager_migrator_types.h +++ b/src/v/cluster/migrations/tx_manager_migrator_types.h @@ -53,7 +53,7 @@ struct tx_manager_read_reply : ec(ec) {} tx_manager_read_reply( - fragmented_vector batches, + chunked_vector batches, model::offset log_dirty_offset) : ec(errc::success) , batches(std::move(batches)) @@ -75,7 +75,7 @@ struct tx_manager_read_reply auto serde_fields() { return std::tie(ec, batches, log_dirty_offset); } errc ec; - fragmented_vector batches; + chunked_vector batches; model::offset log_dirty_offset; friend bool operator==( @@ -96,7 +96,7 @@ struct tx_manager_replicate_request tx_manager_replicate_request() = default; tx_manager_replicate_request( - model::ntp ntp, fragmented_vector batches) + model::ntp ntp, chunked_vector batches) : ntp(std::move(ntp)) , batches(std::move(batches)) {} @@ -121,7 +121,7 @@ struct tx_manager_replicate_request auto serde_fields() { return std::tie(ntp, batches); } model::ntp ntp; - fragmented_vector batches; + chunked_vector batches; friend bool operator==( const tx_manager_replicate_request& lhs, diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index d8337ad611f9b..f444a9b8de2a4 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -338,15 +338,15 @@ ss::future partition::make_cloud_reader( } ss::future> partition::replicate( - model::record_batch_reader&& r, raft::replicate_options opts) { + chunked_vector batches, raft::replicate_options opts) { using ret_t = result; - auto reader = std::move(r); auto maybe_units = co_await hold_writes_enabled(); if (!maybe_units) { co_return ret_t(maybe_units.error()); } - auto res = co_await _raft->replicate(std::move(reader), opts); + + auto res = co_await _raft->replicate(std::move(batches), opts); if (!res) { co_return ret_t(res.error()); } @@ -402,7 +402,7 @@ kafka_stages stages_with_units( kafka_stages partition::replicate_in_stages( model::batch_identity bid, - model::record_batch_reader&& r, + chunked_vector batches, raft::replicate_options opts) { using ret_t = result; @@ -430,12 +430,13 @@ kafka_stages partition::replicate_in_stages( hold_writes_enabled(), [this, bid = std::move(bid), - r = std::move(r), + batches = std::move(batches), opts = std::move(opts)]() mutable { if (_rm_stm) { - return _rm_stm->replicate_in_stages(bid, std::move(r), opts); + return _rm_stm->replicate_in_stages( + bid, std::move(batches), opts); } - auto res = _raft->replicate_in_stages(std::move(r), opts); + auto res = _raft->replicate_in_stages(std::move(batches), opts); auto replicate_finished = res.replicate_finished.then( [this](result r) { if (!r) { diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index e571b4e9b737e..cbd12449b0057 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -80,8 +80,8 @@ class partition : public ss::enable_lw_shared_from_this { /// after a configuration change. void maybe_construct_archiver(); - ss::future> - replicate(model::record_batch_reader&&, raft::replicate_options); + ss::future> replicate( + chunked_vector batches, raft::replicate_options); /// Truncate the beginning of the log up until a given offset /// Can only be performed on logs that are deletable and non internal @@ -90,7 +90,7 @@ class partition : public ss::enable_lw_shared_from_this { kafka_stages replicate_in_stages( model::batch_identity, - model::record_batch_reader&&, + chunked_vector batches, raft::replicate_options); /** diff --git a/src/v/cluster/partition_properties_stm.cc b/src/v/cluster/partition_properties_stm.cc index 438fea8be3f34..919fbbc1540a3 100644 --- a/src/v/cluster/partition_properties_stm.cc +++ b/src/v/cluster/partition_properties_stm.cc @@ -210,10 +210,7 @@ partition_properties_stm::replicate_properties_update( raft::consistency_level::quorum_ack, std::chrono::milliseconds(timeout / 1ms)); r_opts.set_force_flush(); - auto r = co_await _raft->replicate( - _insync_term, - model::make_memory_record_batch_reader(std::move(b)), - r_opts); + auto r = co_await _raft->replicate(_insync_term, std::move(b), r_opts); if (r.has_error()) { vlog( diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index e74f7cfda5436..40af99101bc4c 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -380,9 +380,8 @@ ss::future> rm_stm::do_begin_tx( model::record_batch batch = make_fence_batch( pid, tx_seq, transaction_timeout_ms, tm); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto r = co_await _raft->replicate( - synced_term, std::move(reader), make_replicate_options()); + synced_term, std::move(batch), make_replicate_options()); if (!r) { vlog( @@ -527,9 +526,9 @@ ss::future rm_stm::do_commit_tx( auto batch = make_tx_control_batch( pid, model::control_record_type::tx_commit); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); + auto r = co_await _raft->replicate( - synced_term, std::move(reader), make_replicate_options()); + synced_term, std::move(batch), make_replicate_options()); if (!r) { vlog( @@ -680,9 +679,8 @@ ss::future rm_stm::do_abort_tx( auto batch = make_tx_control_batch( pid, model::control_record_type::tx_abort); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto r = co_await _raft->replicate( - synced_term, std::move(reader), make_replicate_options()); + synced_term, std::move(batch), make_replicate_options()); if (!r) { vlog( @@ -717,32 +715,37 @@ ss::future rm_stm::do_abort_tx( kafka_stages rm_stm::replicate_in_stages( model::batch_identity bid, - model::record_batch_reader r, + chunked_vector batches, raft::replicate_options opts) { auto enqueued = ss::make_lw_shared>(); auto f = enqueued->get_future(); - auto replicate_finished - = do_replicate(bid, std::move(r), opts, enqueued).finally([enqueued] { - // we should avoid situations when replicate_finished is set while - // enqueued isn't because it leads to hanging produce requests and - // the resource leaks. since staged replication is an optimization - // and setting enqueued only after replicate_finished is already - // set doesn't have sematic implications adding this post - // replicate_finished as a safety measure in case enqueued isn't - // set explicitly - if (!enqueued->available()) { - enqueued->set_value(); - } - }); - return kafka_stages(std::move(f), std::move(replicate_finished)); + auto replicate_finished = do_replicate( + bid, std::move(batches), opts, enqueued) + .finally([enqueued] { + // we should avoid situations when + // replicate_finished is set while enqueued + // isn't because it leads to hanging produce + // requests and the resource leaks. since + // staged replication is an optimization and + // setting enqueued only after + // replicate_finished is already set doesn't + // have sematic implications adding this + // post replicate_finished as a safety + // measure in case enqueued isn't set + // explicitly + if (!enqueued->available()) { + enqueued->set_value(); + } + }); + return {std::move(f), std::move(replicate_finished)}; } ss::future> rm_stm::replicate( model::batch_identity bid, - model::record_batch_reader r, + chunked_vector batches, raft::replicate_options opts) { auto enqueued = ss::make_lw_shared>(); - return do_replicate(bid, std::move(r), opts, enqueued); + return do_replicate(bid, std::move(batches), opts, enqueued); } ss::future::holder> rm_stm::prepare_transfer_leadership() { @@ -751,18 +754,18 @@ ss::future::holder> rm_stm::prepare_transfer_leadership() { ss::future> rm_stm::do_replicate( model::batch_identity bid, - model::record_batch_reader b, + chunked_vector batches, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { auto holder = _gate.hold(); auto unit = co_await _state_lock.hold_read_lock(); if (bid.is_transactional) { - co_return co_await transactional_replicate(bid, std::move(b)); + co_return co_await transactional_replicate(bid, std::move(batches)); } else if (bid.is_idempotent()) { co_return co_await idempotent_replicate( - bid, std::move(b), opts, enqueued); + bid, std::move(batches), opts, enqueued); } - co_return co_await replicate_msg(std::move(b), opts, enqueued); + co_return co_await replicate_msg(std::move(batches), opts, enqueued); } ss::future<> rm_stm::stop() { @@ -835,9 +838,9 @@ ss::future> rm_stm::transactional_replicate( model::term_id synced_term, producer_ptr producer, model::batch_identity bid, - model::record_batch_reader rdr) { + chunked_vector batches) { auto result = co_await do_transactional_replicate( - synced_term, producer, bid, std::move(rdr)); + synced_term, producer, bid, std::move(batches)); if (!result) { vlog( _ctx_log.trace, @@ -864,7 +867,7 @@ ss::future> rm_stm::do_transactional_replicate( model::term_id synced_term, producer_ptr producer, model::batch_identity bid, - model::record_batch_reader rdr) { + chunked_vector batches) { if (producer->id().epoch != bid.pid.epoch) { vlog( _ctx_log.warn, @@ -908,7 +911,7 @@ ss::future> rm_stm::do_transactional_replicate( req_ptr->mark_request_in_progress(); auto r = co_await _raft->replicate( - synced_term, std::move(rdr), make_replicate_options()); + synced_term, std::move(batches), make_replicate_options()); if (!r) { vlog( _ctx_log.warn, @@ -939,7 +942,7 @@ ss::future> rm_stm::do_transactional_replicate( } ss::future> rm_stm::transactional_replicate( - model::batch_identity bid, model::record_batch_reader rdr) { + model::batch_identity bid, chunked_vector batches) { if (!check_tx_permitted()) { co_return cluster::errc::generic_tx_error; } @@ -955,7 +958,7 @@ ss::future> rm_stm::transactional_replicate( co_return co_await producer->run_with_lock( [&, synced_term](ssx::semaphore_units units) { return do_transactional_replicate( - synced_term, producer, bid, std::move(rdr)) + synced_term, producer, bid, std::move(batches)) .finally([units = std::move(units)] {}); }); } @@ -964,7 +967,7 @@ ss::future> rm_stm::idempotent_replicate( model::term_id synced_term, producer_ptr producer, model::batch_identity bid, - model::record_batch_reader br, + chunked_vector batches, raft::replicate_options opts, ss::lw_shared_ptr> enqueued, ssx::semaphore_units units, @@ -973,7 +976,7 @@ ss::future> rm_stm::idempotent_replicate( synced_term, producer, bid, - std::move(br), + std::move(batches), opts, std::move(enqueued), units, @@ -1017,7 +1020,7 @@ ss::future> rm_stm::do_idempotent_replicate( model::term_id synced_term, producer_ptr producer, model::batch_identity bid, - model::record_batch_reader br, + chunked_vector batches, raft::replicate_options opts, ss::lw_shared_ptr> enqueued, ssx::semaphore_units& units, @@ -1055,7 +1058,8 @@ ss::future> rm_stm::do_idempotent_replicate( } req_ptr->mark_request_in_progress(); - auto stages = _raft->replicate_in_stages(synced_term, std::move(br), opts); + auto stages = _raft->replicate_in_stages( + synced_term, std::move(batches), opts); auto req_enqueued = co_await ss::coroutine::as_future( std::move(stages.request_enqueued)); if (req_enqueued.failed()) { @@ -1091,7 +1095,7 @@ ss::future> rm_stm::do_idempotent_replicate( ss::future> rm_stm::idempotent_replicate( model::batch_identity bid, - model::record_batch_reader br, + chunked_vector batches, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { if (!co_await sync(_sync_timeout())) { @@ -1108,7 +1112,7 @@ ss::future> rm_stm::idempotent_replicate( synced_term, producer, bid, - std::move(br), + std::move(batches), opts, std::move(enqueued), std::move(units), @@ -1127,7 +1131,7 @@ ss::future> rm_stm::idempotent_replicate( } ss::future> rm_stm::replicate_msg( - model::record_batch_reader br, + chunked_vector batches, raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { using ret_t = result; @@ -1136,7 +1140,8 @@ ss::future> rm_stm::replicate_msg( co_return cluster::errc::not_leader; } - auto ss = _raft->replicate_in_stages(_insync_term, std::move(br), opts); + auto ss = _raft->replicate_in_stages( + _insync_term, std::move(batches), opts); co_await std::move(ss.request_enqueued); enqueued->set_value(); auto r = co_await std::move(ss.replicate_finished); @@ -1399,10 +1404,8 @@ ss::future rm_stm::do_try_abort_old_tx(producer_ptr producer) { _ctx_log.trace, "pid:{} tx_seq:{} is committed", pid, tx_seq); auto batch = make_tx_control_batch( pid, model::control_record_type::tx_commit); - auto reader = model::make_memory_record_batch_reader( - std::move(batch)); auto cr = co_await _raft->replicate( - synced_term, std::move(reader), make_replicate_options()); + synced_term, std::move(batch), make_replicate_options()); if (!cr) { vlog( _ctx_log.warn, @@ -1440,10 +1443,8 @@ ss::future rm_stm::do_try_abort_old_tx(producer_ptr producer) { _ctx_log.trace, "pid:{} tx_seq:{} is aborted", pid, tx_seq); auto batch = make_tx_control_batch( pid, model::control_record_type::tx_abort); - auto reader = model::make_memory_record_batch_reader( - std::move(batch)); auto cr = co_await _raft->replicate( - synced_term, std::move(reader), make_replicate_options()); + synced_term, std::move(batch), make_replicate_options()); if (!cr) { vlog( _ctx_log.warn, @@ -1495,9 +1496,8 @@ ss::future rm_stm::do_try_abort_old_tx(producer_ptr producer) { auto batch = make_tx_control_batch( pid, model::control_record_type::tx_abort); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto cr = co_await _raft->replicate( - _insync_term, std::move(reader), make_replicate_options()); + _insync_term, std::move(batch), make_replicate_options()); if (!cr) { vlog( diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 4fe2cd6ee0677..6e7e0b1c436ef 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -197,12 +197,12 @@ class rm_stm final : public raft::persisted_stm<> { kafka_stages replicate_in_stages( model::batch_identity, - model::record_batch_reader, + chunked_vector batches, raft::replicate_options); ss::future> replicate( model::batch_identity, - model::record_batch_reader, + chunked_vector batches, raft::replicate_options); ss::future::holder> prepare_transfer_leadership(); @@ -274,28 +274,28 @@ class rm_stm final : public raft::persisted_stm<> { ss::future> do_replicate( model::batch_identity, - model::record_batch_reader, + chunked_vector, raft::replicate_options, ss::lw_shared_ptr>); ss::future> transactional_replicate( - model::batch_identity, model::record_batch_reader); + model::batch_identity, chunked_vector); ss::future> transactional_replicate( model::term_id, tx::producer_ptr, model::batch_identity, - model::record_batch_reader); + chunked_vector); ss::future> do_transactional_replicate( model::term_id, tx::producer_ptr, model::batch_identity, - model::record_batch_reader); + chunked_vector); ss::future> idempotent_replicate( model::batch_identity, - model::record_batch_reader, + chunked_vector, raft::replicate_options, ss::lw_shared_ptr>); @@ -303,7 +303,7 @@ class rm_stm final : public raft::persisted_stm<> { model::term_id, tx::producer_ptr, model::batch_identity, - model::record_batch_reader, + chunked_vector, raft::replicate_options, ss::lw_shared_ptr>, ssx::semaphore_units&, @@ -313,14 +313,14 @@ class rm_stm final : public raft::persisted_stm<> { model::term_id, tx::producer_ptr, model::batch_identity, - model::record_batch_reader, + chunked_vector, raft::replicate_options, ss::lw_shared_ptr>, ssx::semaphore_units, producer_previously_known); ss::future> replicate_msg( - model::record_batch_reader, + chunked_vector, raft::replicate_options, ss::lw_shared_ptr>); diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index a7c279f950a5f..71c551200a375 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -41,13 +41,14 @@ FIXTURE_TEST( wait_for_meta_initialized(); auto count = 5; - auto rdr1 = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(0), - .allow_compression = true, - .count = count, - .producer_id = -1, - .producer_epoch = 0, - .base_sequence = 0}); + auto batches1 = random_batches(model::test::record_batch_spec{ + .offset = model::offset(0), + .allow_compression = true, + .count = count, + .producer_id = -1, + .producer_epoch = 0, + .base_sequence = 0}) + .get(); auto bid1 = model::batch_identity{ .pid = model::producer_identity{-1, 0}, .first_seq = 0, @@ -55,19 +56,20 @@ FIXTURE_TEST( auto r1 = stm .replicate( bid1, - std::move(rdr1), + std::move(batches1), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE((bool)r1); - auto rdr2 = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(count), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = -1, - .producer_epoch = 0, - .base_sequence = 0}); + auto batches2 = random_batches(model::test::record_batch_spec{ + .offset = model::offset(count), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = -1, + .producer_epoch = 0, + .base_sequence = 0}) + .get(); auto bid2 = model::batch_identity{ .pid = model::producer_identity{-1, 0}, .first_seq = 0, @@ -75,7 +77,7 @@ FIXTURE_TEST( auto r2 = stm .replicate( bid2, - std::move(rdr2), + std::move(batches2), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE((bool)r2); @@ -93,14 +95,15 @@ FIXTURE_TEST( wait_for_meta_initialized(); auto count = 5; - auto rdr1 = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(0), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = 0}); + auto batches1 = random_batches(model::test::record_batch_spec{ + .offset = model::offset(0), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = 0}) + .get(); auto bid1 = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = 0, @@ -108,19 +111,20 @@ FIXTURE_TEST( auto r1 = stm .replicate( bid1, - std::move(rdr1), + std::move(batches1), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE((bool)r1); - auto rdr2 = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(count), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = count}); + auto batches2 = random_batches(model::test::record_batch_spec{ + .offset = model::offset(count), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = count}) + .get(); auto bid2 = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = count, @@ -128,7 +132,7 @@ FIXTURE_TEST( auto r2 = stm .replicate( bid2, - std::move(rdr2), + std::move(batches2), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE((bool)r2); @@ -151,14 +155,15 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, rm_stm_test_fixture) { auto count = 5; for (int i = 0; i < 10; i++) { - auto rdr = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(i * count), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = i * count}); + auto batches = random_batches(model::test::record_batch_spec{ + .offset = model::offset(i * count), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = i * count}) + .get(); auto bid = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = i * count, @@ -166,7 +171,7 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, rm_stm_test_fixture) { auto r1 = stm .replicate( bid, - std::move(rdr), + std::move(batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -178,14 +183,15 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, rm_stm_test_fixture) { // pid and seq numbers the duplicated request should yield the same // offsets for (int i = 5; i < 10; i++) { - auto rdr = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(i * count), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = i * count}); + auto batches = random_batches(model::test::record_batch_spec{ + .offset = model::offset(i * count), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = i * count}) + .get(); auto bid = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = i * count, @@ -193,7 +199,7 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, rm_stm_test_fixture) { auto r1 = stm .replicate( bid, - std::move(rdr), + std::move(batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -215,14 +221,15 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, rm_stm_test_fixture) { auto count = 5; for (int i = 0; i < 6; i++) { - auto rdr = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(i * count), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = i * count}); + auto batches = random_batches(model::test::record_batch_spec{ + .offset = model::offset(i * count), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = i * count}) + .get(); auto bid = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = i * count, @@ -230,7 +237,7 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, rm_stm_test_fixture) { auto r1 = stm .replicate( bid, - std::move(rdr), + std::move(batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -239,14 +246,15 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, rm_stm_test_fixture) { } { - auto rdr = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(0), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = 0}); + auto batches = random_batches(model::test::record_batch_spec{ + .offset = model::offset(0), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = 0}) + .get(); auto bid = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = 0, @@ -254,7 +262,7 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, rm_stm_test_fixture) { auto r1 = stm .replicate( bid, - std::move(rdr), + std::move(batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -275,14 +283,15 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, rm_stm_test_fixture) { wait_for_meta_initialized(); auto count = 5; - auto rdr1 = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(0), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = 0}); + auto batches1 = random_batches(model::test::record_batch_spec{ + .offset = model::offset(0), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = 0}) + .get(); auto bid1 = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = 0, @@ -290,19 +299,20 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, rm_stm_test_fixture) { auto r1 = stm .replicate( bid1, - std::move(rdr1), + std::move(batches1), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE((bool)r1); - auto rdr2 = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(count), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = count + 1}); + auto batches2 = random_batches(model::test::record_batch_spec{ + .offset = model::offset(count), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = count + 1}) + .get(); auto bid2 = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = count + 1, @@ -310,7 +320,7 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, rm_stm_test_fixture) { auto r2 = stm .replicate( bid2, - std::move(rdr2), + std::move(batches2), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE( @@ -328,14 +338,15 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, rm_stm_test_fixture) { wait_for_meta_initialized(); auto count = 5; - auto rdr1 = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(0), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = 0}); + auto batches1 = random_batches(model::test::record_batch_spec{ + .offset = model::offset(0), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = 0}) + .get(); auto bid1 = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = 0, @@ -344,14 +355,15 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, rm_stm_test_fixture) { // replicate caches only metadata so as long as batches have the same // pid and seq numbers the duplicated request should yield the same // offsets - auto rdr2 = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(0), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 1, - .producer_epoch = 0, - .base_sequence = 0}); + auto batches2 = random_batches(model::test::record_batch_spec{ + .offset = model::offset(0), + .allow_compression = true, + .count = count, + .enable_idempotence = true, + .producer_id = 1, + .producer_epoch = 0, + .base_sequence = 0}) + .get(); auto bid2 = model::batch_identity{ .pid = model::producer_identity{1, 0}, .first_seq = 0, @@ -359,11 +371,11 @@ FIXTURE_TEST(test_rm_stm_passes_immediate_retry, rm_stm_test_fixture) { auto f1 = stm.replicate( bid1, - std::move(rdr1), + std::move(batches1), raft::replicate_options(raft::consistency_level::quorum_ack)); auto f2 = stm.replicate( bid2, - std::move(rdr2), + std::move(batches2), raft::replicate_options(raft::consistency_level::quorum_ack)); auto r2 = f2.get(); auto r1 = f1.get(); diff --git a/src/v/cluster/tests/partition_properties_stm_test.cc b/src/v/cluster/tests/partition_properties_stm_test.cc index d402db1f19e39..029ef59c236ee 100644 --- a/src/v/cluster/tests/partition_properties_stm_test.cc +++ b/src/v/cluster/tests/partition_properties_stm_test.cc @@ -118,8 +118,7 @@ struct partition_properties_stm_fixture : raft::raft_fixture { .then([&](ss::circular_buffer batches) { return leader_node.raft() ->replicate( - model::make_memory_record_batch_reader( - std::move(batches)), + chunked_vector(std::move(batches)), raft::replicate_options( raft::consistency_level::quorum_ack)) .then([](result res) { diff --git a/src/v/cluster/tests/rebalancing_tests_fixture.h b/src/v/cluster/tests/rebalancing_tests_fixture.h index 5c9554f7e2b4a..9a1b7aa51134f 100644 --- a/src/v/cluster/tests/rebalancing_tests_fixture.h +++ b/src/v/cluster/tests/rebalancing_tests_fixture.h @@ -133,11 +133,9 @@ class rebalancing_tests_fixture : public cluster_test_fixture { auto single_retry = [count, ntp](cluster::partition_manager& pm) { return model::test::make_random_batches(model::offset(0), count) .then([&pm, ntp](auto batches) { - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); // replicate auto f = pm.get(ntp)->raft()->replicate( - std::move(rdr), + chunked_vector(std::move(batches)), raft::replicate_options( raft::consistency_level::quorum_ack)); diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index 4cbf67791039c..86fcf736a3762 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -37,27 +37,28 @@ using namespace std::chrono_literals; static const failure_type invalid_producer_epoch(cluster::errc::invalid_producer_epoch); -struct rich_reader { +struct batches_with_identity { model::batch_identity id; - model::record_batch_reader reader; + chunked_vector batches; }; -static rich_reader make_rreader( +static batches_with_identity make_rreader( model::producer_identity pid, int first_seq, int count, bool is_transactional) { - return rich_reader{ + return batches_with_identity{ .id = model:: batch_identity{.pid = pid, .first_seq = first_seq, .last_seq = first_seq + count - 1, .record_count = count, .is_transactional = is_transactional}, - .reader = random_batch_reader(model::test::record_batch_spec{ - .offset = model::offset(0), - .allow_compression = true, - .count = count, - .producer_id = pid.id, - .producer_epoch = pid.epoch, - .base_sequence = first_seq, - .is_transactional = is_transactional})}; + .batches = random_batches(model::test::record_batch_spec{ + .offset = model::offset(0), + .allow_compression = true, + .count = count, + .producer_id = pid.id, + .producer_epoch = pid.epoch, + .base_sequence = first_seq, + .is_transactional = is_transactional}) + .get()}; } void check_snapshot_sizes(cluster::rm_stm& stm, raft::consensus* c) { @@ -110,7 +111,7 @@ FIXTURE_TEST(test_tx_happy_tx, rm_stm_test_fixture) { auto offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -140,7 +141,7 @@ FIXTURE_TEST(test_tx_happy_tx, rm_stm_test_fixture) { offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE((bool)offset_r); @@ -187,7 +188,7 @@ FIXTURE_TEST(test_tx_aborted_tx_1, rm_stm_test_fixture) { auto offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -217,7 +218,7 @@ FIXTURE_TEST(test_tx_aborted_tx_1, rm_stm_test_fixture) { offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE((bool)offset_r); @@ -274,7 +275,7 @@ FIXTURE_TEST(test_tx_aborted_tx_2, rm_stm_test_fixture) { auto offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -304,7 +305,7 @@ FIXTURE_TEST(test_tx_aborted_tx_2, rm_stm_test_fixture) { offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE_EQUAL(stm.highest_producer_id(), pid2.get_id()); @@ -356,7 +357,7 @@ FIXTURE_TEST(test_tx_unknown_produce, rm_stm_test_fixture) { auto offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -369,7 +370,7 @@ FIXTURE_TEST(test_tx_unknown_produce, rm_stm_test_fixture) { offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); @@ -441,7 +442,7 @@ FIXTURE_TEST(test_tx_begin_fences_produce, rm_stm_test_fixture) { auto offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -473,7 +474,7 @@ FIXTURE_TEST(test_tx_begin_fences_produce, rm_stm_test_fixture) { offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE(!(bool)offset_r); @@ -499,7 +500,7 @@ FIXTURE_TEST(test_tx_post_aborted_produce, rm_stm_test_fixture) { auto offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .get(); @@ -520,7 +521,7 @@ FIXTURE_TEST(test_tx_post_aborted_produce, rm_stm_test_fixture) { offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE((bool)offset_r); @@ -532,7 +533,7 @@ FIXTURE_TEST(test_tx_post_aborted_produce, rm_stm_test_fixture) { offset_r = stm .replicate( rreader.id, - std::move(rreader.reader), + std::move(rreader.batches), raft::replicate_options(raft::consistency_level::quorum_ack)) .get(); BOOST_REQUIRE(offset_r == invalid_producer_epoch); @@ -600,7 +601,7 @@ FIXTURE_TEST(test_aborted_transactions, rm_stm_test_fixture) { stm.begin_tx(pid, tx_seq, timeout, model::partition_id(0)).get()); auto rreader = make_rreader(pid, 0, 5, true); BOOST_REQUIRE( - stm.replicate(rreader.id, std::move(rreader.reader), opts).get()); + stm.replicate(rreader.id, std::move(rreader.batches), opts).get()); return pid; }; @@ -612,7 +613,7 @@ FIXTURE_TEST(test_aborted_transactions, rm_stm_test_fixture) { auto abort_tx = [&](auto pid) { auto rreader = make_rreader(pid, 5, 5, true); BOOST_REQUIRE( - stm.replicate(rreader.id, std::move(rreader.reader), opts).get()); + stm.replicate(rreader.id, std::move(rreader.batches), opts).get()); BOOST_REQUIRE_EQUAL( stm.abort_tx(pid, tx_seq, timeout).get(), cluster::tx::errc::none); }; @@ -716,7 +717,7 @@ FIXTURE_TEST(test_aborted_transactions, rm_stm_test_fixture) { auto rreader = make_rreader( model::producer_identity{-1, -1}, 0, 5, false); BOOST_REQUIRE( - stm.replicate(rreader.id, std::move(rreader.reader), opts).get()); + stm.replicate(rreader.id, std::move(rreader.batches), opts).get()); // roll and abort. roll_log(); diff --git a/src/v/cluster/tests/tx_compaction_utils.h b/src/v/cluster/tests/tx_compaction_utils.h index fb378333eb65b..b75ef3ae615a6 100644 --- a/src/v/cluster/tests/tx_compaction_utils.h +++ b/src/v/cluster/tests/tx_compaction_utils.h @@ -319,11 +319,11 @@ class tx_executor { .last_seq = spec.count - 1, .record_count = spec.count, .is_transactional = true}; - auto reader = model::make_memory_record_batch_reader( - std::move(batches[0])); + auto result = co_await _ctx._stm->replicate( bid, - std::move(reader), + chunked_vector::single( + std::move(batches[0])), raft::replicate_options(raft::consistency_level::quorum_ack)); if (!result.has_value()) { vlog(clusterlog.error, "Error {}", result.error()); diff --git a/src/v/cluster/tm_stm.cc b/src/v/cluster/tm_stm.cc index 3ddcf56dd926e..d74693ce5c5b3 100644 --- a/src/v/cluster/tm_stm.cc +++ b/src/v/cluster/tm_stm.cc @@ -32,8 +32,7 @@ ss::future> tm_stm::replicate_quorum_ack(model::term_id term, model::record_batch&& batch) { auto opts = raft::replicate_options{raft::consistency_level::quorum_ack}; opts.set_force_flush(); - return _raft->replicate( - term, model::make_memory_record_batch_reader(std::move(batch)), opts); + return _raft->replicate(term, std::move(batch), opts); } model::record_batch tm_stm::serialize_tx(tx_metadata tx) { @@ -135,11 +134,11 @@ ss::future> tm_stm::do_barrier() { }); } -model::record_batch_reader make_checkpoint() { +model::record_batch make_checkpoint() { storage::record_batch_builder builder( model::record_batch_type::checkpoint, model::offset(0)); builder.add_raw_kv(iobuf(), iobuf()); - return model::make_memory_record_batch_reader(std::move(builder).build()); + return std::move(builder).build(); } ss::future> diff --git a/src/v/datalake/coordinator/state_machine.cc b/src/v/datalake/coordinator/state_machine.cc index 93fd76895a8f1..c10c750f0f815 100644 --- a/src/v/datalake/coordinator/state_machine.cc +++ b/src/v/datalake/coordinator/state_machine.cc @@ -83,8 +83,7 @@ coordinator_stm::replicate_and_wait( model::term_id term, model::record_batch batch, ss::abort_source& as) { auto opts = raft::replicate_options{raft::consistency_level::quorum_ack}; opts.set_force_flush(); - auto res = co_await _raft->replicate( - term, model::make_memory_record_batch_reader(std::move(batch)), opts); + auto res = co_await _raft->replicate(term, std::move(batch), opts); if (res.has_error()) { co_return errc::raft_error; } diff --git a/src/v/datalake/translation/state_machine.cc b/src/v/datalake/translation/state_machine.cc index a4064c5325b37..c070fa01aac23 100644 --- a/src/v/datalake/translation/state_machine.cc +++ b/src/v/datalake/translation/state_machine.cc @@ -20,14 +20,13 @@ raft::replicate_options make_replicate_options() { return opts; } -model::record_batch_reader make_translation_state_batch(kafka::offset offset) { +model::record_batch make_translation_state_batch(kafka::offset offset) { auto val = datalake::translation::translation_state{ .highest_translated_offset = offset}; storage::record_batch_builder builder( model::record_batch_type::datalake_translation_state, model::offset(0)); builder.add_raw_kv(std::nullopt, serde::to_iobuf(val)); - auto batch = std::move(builder).build(); - return model::make_memory_record_batch_reader(std::move(batch)); + return std::move(builder).build(); } } // namespace diff --git a/src/v/kafka/data/partition_proxy.h b/src/v/kafka/data/partition_proxy.h index 3024fdcf53655..84ef1a3fd75a1 100644 --- a/src/v/kafka/data/partition_proxy.h +++ b/src/v/kafka/data/partition_proxy.h @@ -80,12 +80,12 @@ class partition_proxy { = 0; virtual ss::future> - replicate(model::record_batch_reader, raft::replicate_options) = 0; - + replicate(model::record_batch, raft::replicate_options) = 0; + virtual ss::future> replicate( + chunked_vector, raft::replicate_options) + = 0; virtual raft::replicate_stages replicate( - model::batch_identity, - model::record_batch_reader&&, - raft::replicate_options) + model::batch_identity, model::record_batch, raft::replicate_options) = 0; virtual result get_partition_info() const = 0; @@ -165,16 +165,22 @@ class partition_proxy { return _impl->get_partition_info(); } + ss::future> + replicate(model::record_batch batch, raft::replicate_options opts) const { + return _impl->replicate(std::move(batch), opts); + } + ss::future> replicate( - model::record_batch_reader r, raft::replicate_options opts) const { - return _impl->replicate(std::move(r), opts); + chunked_vector batches, + raft::replicate_options opts) const { + return _impl->replicate(std::move(batches), opts); } raft::replicate_stages replicate( model::batch_identity bi, - model::record_batch_reader&& r, + model::record_batch batch, raft::replicate_options opts) { - return _impl->replicate(bi, std::move(r), opts); + return _impl->replicate(bi, std::move(batch), opts); } private: diff --git a/src/v/kafka/data/replicated_partition.cc b/src/v/kafka/data/replicated_partition.cc index 3d3ff5b358993..5f62a55830083 100644 --- a/src/v/kafka/data/replicated_partition.cc +++ b/src/v/kafka/data/replicated_partition.cc @@ -326,15 +326,15 @@ replicated_partition::timequery(storage::timequery_config cfg) { // no further offset translation is required here. return _partition->timequery(cfg); } - ss::future> replicated_partition::replicate( - model::record_batch_reader rdr, raft::replicate_options opts) { + chunked_vector batches, raft::replicate_options opts) { using ret_t = result; if (_partition->is_read_replica_mode_enabled()) { return ss::make_ready_future( kafka::error_code::invalid_topic_exception); } - return _partition->replicate(std::move(rdr), opts) + + return _partition->replicate(std::move(batches), opts) .then([](result r) { if (!r) { return ret_t(r.error()); @@ -342,10 +342,15 @@ ss::future> replicated_partition::replicate( return ret_t(model::offset(r.value().last_offset())); }); } +ss::future> replicated_partition::replicate( + model::record_batch batch, raft::replicate_options opts) { + return replicate( + chunked_vector::single(std::move(batch)), opts); +} raft::replicate_stages replicated_partition::replicate( model::batch_identity batch_id, - model::record_batch_reader&& rdr, + model::record_batch batch, raft::replicate_options opts) { using ret_t = result; if (_partition->is_read_replica_mode_enabled()) { @@ -354,8 +359,10 @@ raft::replicate_stages replicated_partition::replicate( ss::make_ready_future>( make_error_code(kafka::error_code::invalid_topic_exception))}; } - - auto res = _partition->replicate_in_stages(batch_id, std::move(rdr), opts); + auto res = _partition->replicate_in_stages( + batch_id, + chunked_vector::single(std::move(batch)), + opts); raft::replicate_stages out(raft::errc::success); out.request_enqueued = std::move(res.request_enqueued); diff --git a/src/v/kafka/data/replicated_partition.h b/src/v/kafka/data/replicated_partition.h index 9f7e6bbfbf8b3..14fa546391c87 100644 --- a/src/v/kafka/data/replicated_partition.h +++ b/src/v/kafka/data/replicated_partition.h @@ -63,11 +63,13 @@ class replicated_partition final : public kafka::partition_proxy::impl { timequery(storage::timequery_config cfg) final; ss::future> - replicate(model::record_batch_reader, raft::replicate_options) final; + replicate(model::record_batch, raft::replicate_options) final; + ss::future> replicate( + chunked_vector, raft::replicate_options) final; raft::replicate_stages replicate( model::batch_identity, - model::record_batch_reader&&, + model::record_batch, raft::replicate_options) final; ss::future make_reader( diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 1ac7f3ae35d5f..f95d964455a6b 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1911,10 +1911,10 @@ group::begin_tx(cluster::begin_group_tx_request r) { // replicate fence batch - this is a transaction boundary model::record_batch batch = make_tx_fence_batch( r.pid, std::move(fence), use_dedicated_batch_type_for_fence()); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); + auto result = co_await _partition->raft()->replicate( _term, - std::move(reader), + std::move(batch), raft::replicate_options(raft::consistency_level::quorum_ack)); if (!result) { @@ -2056,11 +2056,10 @@ group::store_txn_offsets(txn_offset_commit_request r) { prepared_tx_record_version, pid, std::move(tx_entry)); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto result = co_await _partition->raft()->replicate( _term, - std::move(reader), + std::move(batch), raft::replicate_options(raft::consistency_level::quorum_ack)); if (!result) { @@ -2227,12 +2226,9 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { offset_commit_response(r, error_code::none)); } - auto batch = std::move(builder).build(); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto replicate_stages = _partition->raft()->replicate_in_stages( _term, - std::move(reader), + chunked_vector::single(std::move(builder).build()), raft::replicate_options(raft::consistency_level::quorum_ack)); auto f = replicate_stages.replicate_finished.then( @@ -2602,12 +2598,11 @@ ss::future group::remove() { add_group_tombstone_record(_id, builder); auto batch = std::move(builder).build(); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { auto result = co_await _partition->raft()->replicate( _term, - std::move(reader), + std::move(batch), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { vlog( @@ -2689,12 +2684,11 @@ ss::future<> group::remove_topic_partitions( } auto batch = std::move(builder).build(); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { auto result = co_await _partition->raft()->replicate( _term, - std::move(reader), + std::move(batch), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { vlog( @@ -2729,7 +2723,7 @@ ss::future> group::store_group(model::record_batch batch) { return _partition->raft()->replicate( _term, - model::make_memory_record_batch_reader(std::move(batch)), + std::move(batch), raft::replicate_options(raft::consistency_level::quorum_ack)); } @@ -2959,11 +2953,10 @@ ss::future group::do_abort( aborted_tx_record_version, pid, std::move(tx)); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto result = co_await _partition->raft()->replicate( _term, - std::move(reader), + std::move(batch), raft::replicate_options(raft::consistency_level::quorum_ack)); if (!result) { @@ -3082,7 +3075,7 @@ ss::future group::do_commit( // tx_gateway_frontend). So redpanda will eventually finish commit and // complete write for both this events. - model::record_batch_reader::data_t batches; + chunked_vector batches; batches.reserve(2); // if pending offsets are empty, (there was no store_txn_offsets call, do // not replicate the offsets update batch) @@ -3114,11 +3107,9 @@ ss::future group::do_commit( batches.push_back(std::move(batch)); - auto reader = model::make_memory_record_batch_reader(std::move(batches)); - auto result = co_await _partition->raft()->replicate( _term, - std::move(reader), + std::move(batches), raft::replicate_options(raft::consistency_level::quorum_ack)); if (!result) { diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 017a39443b8f9..02c19d2122080 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -332,12 +332,11 @@ ss::future group_manager::delete_offsets( * already cleaned up. */ auto batch = std::move(builder).build(); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { auto result = co_await group->partition()->raft()->replicate( group->term(), - std::move(reader), + std::move(batch), raft::replicate_options(raft::consistency_level::leader_ack)); if (result) { @@ -1024,12 +1023,11 @@ ss::future<> group_manager::write_version_fence( // cluster v9 is where offset retention is enabled auto batch = _feature_table.local().encode_version_fence( to_cluster_version(features::release_version::v23_1_1)); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); try { auto result = co_await p->partition->raft()->replicate( term, - std::move(reader), + std::move(batch), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 6a802ca9473d6..0e1430ee9caa4 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -111,18 +111,6 @@ acks_to_replicate_options(int16_t acks, std::chrono::milliseconds timeout) { }; } -static inline model::record_batch_reader -reader_from_lcore_batch(model::record_batch&& batch) { - /* - * The remainder of work for this partition is handled on its home - * core. The foreign memory record batch reader requires that once the - * reader is sent to the foreign core that it has exclusive access to the - * data in reader. That is true here and is generally trivial with readers - * that hold a copy of their data in memory. - */ - return model::make_foreign_memory_record_batch_reader(std::move(batch)); -} - static error_code map_produce_error_code(std::error_code ec) { if (ec.category() == raft::error_category()) { switch (static_cast(ec.value())) { @@ -177,13 +165,13 @@ static partition_produce_stages partition_append( model::partition_id id, ss::lw_shared_ptr partition, model::batch_identity bid, - model::record_batch_reader reader, + model::record_batch batch, int16_t acks, int32_t num_records, int64_t num_bytes, std::chrono::milliseconds timeout_ms) { auto stages = partition->replicate( - bid, std::move(reader), acks_to_replicate_options(acks, timeout_ms)); + bid, std::move(batch), acks_to_replicate_options(acks, timeout_ms)); return partition_produce_stages{ .dispatched = std::move(stages.request_enqueued), .produced = stages.replicate_finished.then_wrapped( @@ -356,7 +344,6 @@ static partition_produce_stages produce_topic_partition( auto bid = model::batch_identity::from(hdr); auto batch_size = batch.size_bytes(); auto num_records = batch.record_count(); - auto reader = reader_from_lcore_batch(std::move(batch)); auto validator = pandaproxy::schema_registry::maybe_make_schema_id_validator( octx.rctx.schema_registry(), topic.name, topic_cfg->properties); @@ -377,7 +364,7 @@ static partition_produce_stages produce_topic_partition( .invoke_on( *shard, octx.ssg, - [reader = std::move(reader), + [batch = std::move(batch), validator = std::move(validator), ntp = std::move(ntp), dispatch = std::move(dispatch), @@ -415,7 +402,7 @@ static partition_produce_stages produce_topic_partition( auto probe = std::addressof(partition->probe()); return pandaproxy::schema_registry::maybe_validate_schema_id( - std::move(validator), std::move(reader), probe) + std::move(validator), std::move(batch), probe) .then([ntp{std::move(ntp)}, partition{std::move(partition)}, dispatch = std::move(dispatch), @@ -424,10 +411,11 @@ static partition_produce_stages produce_topic_partition( source_shard, num_records, batch_size, - timeout](auto reader) mutable { - if (reader.has_error()) { + timeout](result + batch) mutable { + if (batch.has_error()) { return finalize_request_with_error_code( - reader.assume_error(), + batch.error(), std::move(dispatch), ntp, source_shard); @@ -437,7 +425,7 @@ static partition_produce_stages produce_topic_partition( ss::make_lw_shared( std::move(partition)), bid, - std::move(reader).assume_value(), + std::move(batch).value(), acks, num_records, batch_size, diff --git a/src/v/kafka/server/tests/fetch_test.cc b/src/v/kafka/server/tests/fetch_test.cc index 32057f39293e9..8f9e33e9ea8fb 100644 --- a/src/v/kafka/server/tests/fetch_test.cc +++ b/src/v/kafka/server/tests/fetch_test.cc @@ -434,11 +434,10 @@ FIXTURE_TEST(fetch_leader_epoch, redpanda_thread_fixture) { { auto batches = model::test::make_random_batches(model::offset(0), 5).get(); - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); + partition->raft() ->replicate( - std::move(rdr), + chunked_vector(std::move(batches)), raft::replicate_options( raft::consistency_level::quorum_ack)) .discard_result() @@ -447,13 +446,12 @@ FIXTURE_TEST(fetch_leader_epoch, redpanda_thread_fixture) { partition->raft()->step_down("trigger epoch change").get(); wait_for_leader(ntp, 10s).get(); { - auto batches - = model::test::make_random_batches(model::offset(0), 5).get(); - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); + auto batches = chunked_vector( + model::test::make_random_batches(model::offset(0), 5).get()); + partition->raft() ->replicate( - std::move(rdr), + std::move(batches), raft::replicate_options( raft::consistency_level::quorum_ack)) .discard_result() @@ -531,10 +529,8 @@ FIXTURE_TEST(fetch_multi_partitions_debounce, redpanda_thread_fixture) { return model::test::make_random_batches(model::offset(0), 5) .then([ntp, &mgr](auto batches) { auto partition = mgr.get(ntp); - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); return partition->raft()->replicate( - std::move(rdr), + chunked_vector(std::move(batches)), raft::replicate_options( raft::consistency_level::quorum_ack)); }); @@ -600,10 +596,8 @@ FIXTURE_TEST(fetch_leader_ack, redpanda_thread_fixture) { return model::test::make_random_batches(model::offset(0), 5) .then([ntp, &mgr](auto batches) { auto partition = mgr.get(ntp); - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); return partition->raft()->replicate( - std::move(rdr), + chunked_vector(std::move(batches)), raft::replicate_options( raft::consistency_level::leader_ack)); }); @@ -659,10 +653,8 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) { return model::test::make_random_batches(model::offset(0), 5) .then([ntp, &mgr](auto batches) { auto partition = mgr.get(ntp); - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); return partition->raft()->replicate( - std::move(rdr), + chunked_vector(std::move(batches)), raft::replicate_options( raft::consistency_level::quorum_ack)); }); @@ -740,10 +732,8 @@ FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) { return model::test::make_random_batches(model::offset(0), 5) .then([ntp, &mgr](auto batches) { auto partition = mgr.get(ntp); - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); return partition->raft()->replicate( - std::move(rdr), + chunked_vector(std::move(batches)), raft::replicate_options( raft::consistency_level::quorum_ack)); }); @@ -793,10 +783,8 @@ FIXTURE_TEST(fetch_request_max_bytes, redpanda_thread_fixture) { return model::test::make_random_batches(model::offset(0), 20) .then([ntp, &mgr](auto batches) { auto partition = mgr.get(ntp); - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); return partition->raft()->replicate( - std::move(rdr), + chunked_vector(std::move(batches)), raft::replicate_options( raft::consistency_level::quorum_ack)); }); diff --git a/src/v/kafka/server/tests/replicated_partition_test.cc b/src/v/kafka/server/tests/replicated_partition_test.cc index a8710719e27a1..c1796e5a23b58 100644 --- a/src/v/kafka/server/tests/replicated_partition_test.cc +++ b/src/v/kafka/server/tests/replicated_partition_test.cc @@ -63,8 +63,8 @@ FIXTURE_TEST(test_replicated_partition_end_offset, redpanda_thread_fixture) { // replicate a batch that is subjected to offset translation return p ->replicate( - model::make_memory_record_batch_reader( - {std::move(builder).build()}), + chunked_vector::single( + std::move(builder).build()), raft::replicate_options(raft::consistency_level::quorum_ack)) .then([p, rp](result rr) { BOOST_REQUIRE(rr.has_value()); diff --git a/src/v/kafka/server/tests/topic_recreate_test.cc b/src/v/kafka/server/tests/topic_recreate_test.cc index 855646fea5ef1..c5c253489acca 100644 --- a/src/v/kafka/server/tests/topic_recreate_test.cc +++ b/src/v/kafka/server/tests/topic_recreate_test.cc @@ -260,12 +260,11 @@ FIXTURE_TEST(test_recreated_topic_does_not_lose_data, recreate_test_fixture) { [ntp](cluster::partition_manager& pm) { return model::test::make_random_batches(model::offset(0), 5) .then([&pm, ntp](auto batches) { - auto rdr = model::make_memory_record_batch_reader( - std::move(batches)); auto p = pm.get(ntp); return p->raft() ->replicate( - std::move(rdr), + chunked_vector( + std::move(batches)), raft::replicate_options( raft::consistency_level::quorum_ack)) .then([p](auto) { return p->committed_offset(); }); diff --git a/src/v/pandaproxy/schema_registry/validation.cc b/src/v/pandaproxy/schema_registry/validation.cc index 8c4cbcba25230..ed6fbf52ad98c 100644 --- a/src/v/pandaproxy/schema_registry/validation.cc +++ b/src/v/pandaproxy/schema_registry/validation.cc @@ -159,10 +159,6 @@ T combine( class schema_id_validator::impl { public: - using data_t = model::record_batch_reader::data_t; - using foreign_data_t = model::record_batch_reader::foreign_data_t; - using storage_t = model::record_batch_reader::storage_t; - impl( const std::unique_ptr& api, model::topic topic, @@ -359,18 +355,7 @@ class schema_id_validator::impl { co_return valid; } - ss::future validate(const data_t& data) { - for (const auto& b : data) { - if (!co_await validate(b)) { - co_return false; - } - } - co_return true; - } - - auto validate(const foreign_data_t& data) { return validate(*data.buffer); } - - ss::future operator()(model::record_batch_reader&& rbr) { + ss::future operator()(model::record_batch batch) { if (!_api) { // If Schema Registry is not enabled, the safe default is to reject co_return kafka::error_code::invalid_record; @@ -378,25 +363,16 @@ class schema_id_validator::impl { if ( config::shard_local_cfg().enable_schema_id_validation() == pandaproxy::schema_registry::schema_id_validation_mode::none) { - co_return std::move(rbr); + co_return std::move(batch); } - auto impl = std::move(rbr).release(); - auto slice = co_await impl->do_load_slice(model::no_timeout); - vassert( - impl->is_end_of_stream(), - "Attempt to validate schema id on a record_batch_reader with " - "multiple slices"); - - auto valid = co_await ss::visit( - slice, [this](const auto& d) { return validate(d); }); + auto valid = co_await validate(batch); if (!valid) { // It's possible that the schema registry doesn't have a newly // written schema, update and retry. co_await _api->_sequencer.local().read_sync(); - valid = co_await ss::visit( - slice, [this](const auto& d) { return validate(d); }); + valid = co_await validate(batch); } if (!valid) { @@ -414,7 +390,7 @@ class schema_id_validator::impl { co_return kafka::error_code::invalid_record; } - co_return model::make_memory_record_batch_reader(std::move(slice)); + co_return std::move(batch); } private: @@ -472,9 +448,9 @@ std::optional maybe_make_schema_id_validator( } ss::future schema_id_validator::operator()( - model::record_batch_reader&& rbr, cluster::partition_probe* probe) { + model::record_batch batch, cluster::partition_probe* probe) { using futurator = ss::futurize; - return (*_impl)(std::move(rbr)) + return (*_impl)(std::move(batch)) .handle_exception([](std::exception_ptr e) { vlog(plog.warn, "Invalid record due to exception: {}", e); return futurator::convert(kafka::error_code::invalid_record); diff --git a/src/v/pandaproxy/schema_registry/validation.h b/src/v/pandaproxy/schema_registry/validation.h index da9d7f00bed5e..ecba707c8849c 100644 --- a/src/v/pandaproxy/schema_registry/validation.h +++ b/src/v/pandaproxy/schema_registry/validation.h @@ -42,9 +42,9 @@ class schema_id_validator { schema_id_validator& operator=(const schema_id_validator&) = delete; ~schema_id_validator() noexcept; - using result = ::result; + using result = ::result; ss::future - operator()(model::record_batch_reader&&, cluster::partition_probe* probe); + operator()(model::record_batch, cluster::partition_probe* probe); private: std::unique_ptr _impl; @@ -57,12 +57,12 @@ std::optional maybe_make_schema_id_validator( inline ss::future maybe_validate_schema_id( std::optional validator, - model::record_batch_reader rbr, + model::record_batch batch, cluster::partition_probe* probe) { if (validator) { - co_return co_await (*validator)(std::move(rbr), probe); + co_return co_await (*validator)(std::move(batch), probe); } - co_return std::move(rbr); + co_return std::move(batch); } } // namespace pandaproxy::schema_registry diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 28f92308b34f2..765ec95541893 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -813,28 +813,43 @@ consensus::chain_stages(replicate_stages stages) { }); } +ss::future> consensus::replicate( + chunked_vector batches, replicate_options opts) { + return chain_stages(do_replicate({}, std::move(batches), opts)); +} ss::future> -consensus::replicate(model::record_batch_reader&& rdr, replicate_options opts) { - return chain_stages(do_replicate({}, std::move(rdr), opts)); +consensus::replicate(model::record_batch batch, replicate_options opts) { + return chain_stages(do_replicate( + {}, chunked_vector::single(std::move(batch)), opts)); +} + +ss::future> consensus::replicate( + model::term_id expected_term, + model::record_batch batch, + replicate_options opts) { + return chain_stages(do_replicate( + expected_term, + chunked_vector::single(std::move(batch)), + opts)); } ss::future> consensus::replicate( model::term_id expected_term, - model::record_batch_reader&& rdr, + chunked_vector batches, replicate_options opts) { - return chain_stages(do_replicate(expected_term, std::move(rdr), opts)); + return chain_stages(do_replicate(expected_term, std::move(batches), opts)); } replicate_stages consensus::replicate_in_stages( - model::record_batch_reader&& rdr, replicate_options opts) { - return do_replicate({}, std::move(rdr), opts); + chunked_vector batches, replicate_options opts) { + return do_replicate({}, std::move(batches), opts); } replicate_stages consensus::replicate_in_stages( model::term_id expected_term, - model::record_batch_reader&& rdr, + chunked_vector batches, replicate_options opts) { - return do_replicate(expected_term, std::move(rdr), opts); + return do_replicate(expected_term, std::move(batches), opts); } replicate_stages @@ -851,7 +866,7 @@ wrap_stages_with_gate(ss::gate& gate, replicate_stages stages) { } replicate_stages consensus::do_replicate( std::optional expected_term, - model::record_batch_reader&& rdr, + chunked_vector batches, replicate_options opts) { // if gate is closed return fast, after this check we are certain that // `ss::with_gate` will succeed @@ -881,7 +896,7 @@ replicate_stages consensus::do_replicate( } return wrap_stages_with_gate( - _bg, _batcher.replicate(expected_term, std::move(rdr), opts)); + _bg, _batcher.replicate(expected_term, std::move(batches), opts)); } ss::future diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 0bf3dfb7dfcde..810a23897d94e 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -248,9 +248,11 @@ class consensus { model::offset); ss::future> - replicate(model::record_batch_reader&&, replicate_options); - replicate_stages - replicate_in_stages(model::record_batch_reader&&, replicate_options); + replicate(chunked_vector, replicate_options); + ss::future> + replicate(model::record_batch, replicate_options); + replicate_stages replicate_in_stages( + chunked_vector, replicate_options); uint64_t get_snapshot_size() const { return _snapshot_size; } std::optional& stm_manager() { return _stm_manager; } @@ -278,10 +280,13 @@ class consensus { * d. cache the term * e. continue with step #1 */ + ss::future> replicate( + model::term_id, chunked_vector, replicate_options); ss::future> - replicate(model::term_id, model::record_batch_reader&&, replicate_options); + replicate(model::term_id, model::record_batch, replicate_options); replicate_stages replicate_in_stages( - model::term_id, model::record_batch_reader&&, replicate_options); + model::term_id, chunked_vector, replicate_options); + ss::future make_reader( storage::log_reader_config, std::optional = std::nullopt); @@ -588,7 +593,7 @@ class consensus { replicate_stages do_replicate( std::optional, - model::record_batch_reader&&, + chunked_vector, replicate_options); ss::future> chain_stages(replicate_stages); diff --git a/src/v/raft/mux_state_machine.h b/src/v/raft/mux_state_machine.h index 3a781cda79bf5..d7444c459e331 100644 --- a/src/v/raft/mux_state_machine.h +++ b/src/v/raft/mux_state_machine.h @@ -254,12 +254,12 @@ ss::future> mux_state_machine::replicate( if (term) { return _c->replicate( term.value(), - model::make_memory_record_batch_reader(std::move(batch)), + std::move(batch), raft::replicate_options{raft::consistency_level::quorum_ack}); } return _c->replicate( - model::make_memory_record_batch_reader(std::move(batch)), + std::move(batch), raft::replicate_options{raft::consistency_level::quorum_ack}); }); } diff --git a/src/v/raft/replicate_batcher.cc b/src/v/raft/replicate_batcher.cc index 652f0c350d93a..f60ba0ba2cb86 100644 --- a/src/v/raft/replicate_batcher.cc +++ b/src/v/raft/replicate_batcher.cc @@ -31,13 +31,13 @@ replicate_batcher::replicate_batcher(consensus* ptr, size_t cache_size) replicate_stages replicate_batcher::replicate( std::optional expected_term, - model::record_batch_reader r, + chunked_vector batches, replicate_options opts) { ss::promise<> enqueued; auto enqueued_f = enqueued.get_future(); auto f = cache_and_wait_for_result( - std::move(enqueued), expected_term, std::move(r), opts); + std::move(enqueued), expected_term, std::move(batches), opts); return {std::move(enqueued_f), std::move(f)}; } @@ -45,7 +45,7 @@ ss::future> replicate_batcher::cache_and_wait_for_result( ss::promise<> enqueued, std::optional expected_term, - model::record_batch_reader r, + chunked_vector r, replicate_options opts) { item_ptr item; try { @@ -109,13 +109,8 @@ ss::future<> replicate_batcher::stop() { ss::future replicate_batcher::do_cache( std::optional expected_term, - model::record_batch_reader r, + chunked_vector batches, replicate_options opts) { - auto batches = co_await model::consume_reader_to_chunked_vector( - std::move(r), - opts.timeout ? model::timeout_clock::now() + opts.timeout.value() - : model::no_timeout); - size_t bytes = std::accumulate( batches.cbegin(), batches.cend(), diff --git a/src/v/raft/replicate_batcher.h b/src/v/raft/replicate_batcher.h index 180f441d10c3d..3b7182e0efc80 100644 --- a/src/v/raft/replicate_batcher.h +++ b/src/v/raft/replicate_batcher.h @@ -125,7 +125,7 @@ class replicate_batcher { replicate_stages replicate( std::optional, - model::record_batch_reader, + chunked_vector, replicate_options); ss::future<> flush(ssx::semaphore_units u, const bool transfer_flush); @@ -141,7 +141,7 @@ class replicate_batcher { ss::future do_cache( std::optional, - model::record_batch_reader, + chunked_vector, replicate_options); ss::future do_cache_with_backpressure( @@ -153,7 +153,7 @@ class replicate_batcher { ss::future> cache_and_wait_for_result( ss::promise<> enqueued, std::optional expected_term, - model::record_batch_reader r, + chunked_vector r, replicate_options); consensus* _ptr; diff --git a/src/v/raft/state_machine.cc b/src/v/raft/state_machine.cc index 56dc711dada1c..5972284b41dd9 100644 --- a/src/v/raft/state_machine.cc +++ b/src/v/raft/state_machine.cc @@ -105,11 +105,11 @@ state_machine::batch_applicator::operator()(model::record_batch batch) { bool state_machine::stop_batch_applicator() { return _gate.is_closed(); } -model::record_batch_reader make_checkpoint() { +model::record_batch make_checkpoint() { storage::record_batch_builder builder( model::record_batch_type::checkpoint, model::offset(0)); builder.add_raw_kv(iobuf(), iobuf()); - return model::make_memory_record_batch_reader(std::move(builder).build()); + return std::move(builder).build(); } // FIXME: implement linearizable reads in a way similar to Logcabin diff --git a/src/v/raft/tests/append_entries_test.cc b/src/v/raft/tests/append_entries_test.cc index 2a34607206432..c2df962c600f7 100644 --- a/src/v/raft/tests/append_entries_test.cc +++ b/src/v/raft/tests/append_entries_test.cc @@ -322,7 +322,7 @@ FIXTURE_TEST(test_recovery_of_crashed_leader_truncation, raft_test_fixture) { // append some entries to leader log auto leader_raft = gr.get_member(first_leader_id).consensus; auto f = leader_raft->replicate( - random_batches_reader(2).get(), default_replicate_opts); + random_batches(2).get(), default_replicate_opts); leader_raft.release(); // since replicate doesn't accept timeout client have to deal with it. ss::with_timeout(model::timeout_clock::now() + 1s, std::move(f)) @@ -805,11 +805,9 @@ FIXTURE_TEST(test_big_batches_replication, raft_test_fixture) { auto value = bytes_to_iobuf(random_generators::get_bytes(3_MiB)); builder.add_raw_kv({}, std::move(value)); - auto rdr = model::make_memory_record_batch_reader( - {std::move(builder).build()}); return leader_node.consensus ->replicate( - std::move(rdr), + std::move(builder).build(), raft::replicate_options(raft::consistency_level::quorum_ack)) .then([](result res) { if (!res) { @@ -825,16 +823,16 @@ FIXTURE_TEST(test_big_batches_replication, raft_test_fixture) { validate_offset_translation(gr); } struct request_ordering_test_fixture : public raft_test_fixture { - model::record_batch_reader make_indexed_batch_reader(int32_t idx) { + chunked_vector make_indexed_batches(int32_t idx) { storage::record_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); iobuf buf; reflection::serialize(buf, idx); builder.add_raw_kv({}, std::move(buf)); - - return model::make_memory_record_batch_reader( - {std::move(builder).build()}); + chunked_vector batches; + batches.push_back(std::move(builder).build()); + return batches; } ss::future<> replicate_indexed_batches( @@ -842,8 +840,7 @@ struct request_ordering_test_fixture : public raft_test_fixture { std::vector>> results; for (int32_t i = 0; i < 20; ++i) { auto r = c->replicate_in_stages( - make_indexed_batch_reader(i), - raft::replicate_options(consistency)); + make_indexed_batches(i), raft::replicate_options(consistency)); // wait for request to be enqueued before dispatching next one // (comenting this out should cause this test to fail) r.request_enqueued.get(); diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index 2050b11ce0687..b1100bb3ace5c 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -63,19 +63,19 @@ TEST_F(raft_fixture, test_empty_writes) { create_simple_group(5).get(); auto leader = wait_for_leader(10s).get(); - auto replicate = [&](auto reader) { + auto replicate = [&](chunked_vector batches) { return node(leader).raft()->replicate( - std::move(reader), replicate_options{consistency_level::quorum_ack}); + std::move(batches), replicate_options{consistency_level::quorum_ack}); }; // no records storage::record_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); - auto reader = model::make_memory_record_batch_reader( - std::move(builder).build()); // Catch the error when appending. - auto res = replicate(std::move(reader)).get(); + auto res = replicate(chunked_vector::single( + std::move(builder).build())) + .get(); ASSERT_TRUE(res.has_error()); ASSERT_EQ(res.error(), errc::leader_append_failed); diff --git a/src/v/raft/tests/persisted_stm_test.cc b/src/v/raft/tests/persisted_stm_test.cc index a8ce495f8a418..99f93c234f15b 100644 --- a/src/v/raft/tests/persisted_stm_test.cc +++ b/src/v/raft/tests/persisted_stm_test.cc @@ -261,15 +261,13 @@ class persisted_kv : public persisted_stm<> { co_return serde::to_iobuf(std::move(inc_state)); }; - model::record_batch_reader - build_batch(std::vector operations) { + model::record_batch build_batch(std::vector operations) { storage::record_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); for (auto& op : operations) { builder.add_raw_kv(iobuf{}, serde::to_iobuf(std::move(op))); } - auto batch = std::move(builder).build(); - return model::make_memory_record_batch_reader({std::move(batch)}); + return std::move(builder).build(); } kv_state state; diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index 3baf664a2b462..e13534444b3f0 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -332,7 +332,7 @@ class raft_fixture ss::future<> create_simple_group(size_t number_of_nodes); - model::record_batch_reader + chunked_vector make_batches(std::vector> batch_spec) { const auto sz = batch_spec.size(); return make_batches(sz, [spec = std::move(batch_spec)](size_t idx) { @@ -346,21 +346,21 @@ class raft_fixture } template - model::record_batch_reader + chunked_vector make_batches(size_t batch_count, Generator&& generator) { - ss::circular_buffer batches; + chunked_vector batches; batches.reserve(batch_count); for (auto b_idx : boost::irange(batch_count)) { batches.push_back(generator(b_idx)); } - return model::make_memory_record_batch_reader(std::move(batches)); + return batches; } - model::record_batch_reader make_batches( + chunked_vector make_batches( size_t batch_count, size_t batch_record_count, size_t record_payload_size) { - ss::circular_buffer batches; + chunked_vector batches; batches.reserve(batch_count); for (auto b_idx : boost::irange(batch_count)) { storage::record_batch_builder builder( @@ -374,7 +374,7 @@ class raft_fixture batches.push_back(std::move(builder).build()); } - return model::make_memory_record_batch_reader(std::move(batches)); + return batches; } ss::future<> diff --git a/src/v/raft/tests/raft_group_fixture.h b/src/v/raft/tests/raft_group_fixture.h index 3882db6b5d304..0cd11c8c09ca1 100644 --- a/src/v/raft/tests/raft_group_fixture.h +++ b/src/v/raft/tests/raft_group_fixture.h @@ -568,30 +568,20 @@ struct raft_group { size_t _segment_size; }; -inline ss::future -random_batches_reader(int max_batches) { +inline ss::future> +random_batches(int max_batches) { auto batches = co_await model::test::make_random_batches( model::test::record_batch_spec{ .offset = model::offset(0), .allow_compression = true, .count = max_batches}); - co_return model::make_memory_record_batch_reader(std::move(batches)); + co_return chunked_vector(std::move(batches)); } -inline model::record_batch_reader -random_batch_reader(model::test::record_batch_spec spec) { - auto batch = model::test::make_random_batch(spec); - ss::circular_buffer batches; - batches.reserve(1); - batch.set_term(model::term_id(0)); - batches.push_back(std::move(batch)); - return model::make_memory_record_batch_reader(std::move(batches)); -} - -inline ss::future -random_batches_reader(model::test::record_batch_spec spec) { - auto batches = co_await model::test::make_random_batches(spec); - co_return model::make_memory_record_batch_reader(std::move(batches)); +inline ss::future> +random_batches(model::test::record_batch_spec spec) { + co_return chunked_vector( + co_await model::test::make_random_batches(spec)); } template @@ -805,10 +795,11 @@ inline ss::future replicate_random_batches( model::timeout_clock::duration tout = 1s) { return retry_with_leader( gr, 5, tout, [count, c_lvl](raft_node& leader_node) { - return random_batches_reader(count).then( - [&leader_node, c_lvl](auto rdr) { + return random_batches(count).then( + [&leader_node, c_lvl](chunked_vector batches) { raft::replicate_options opts(c_lvl); - return leader_node.consensus->replicate(std::move(rdr), opts) + return leader_node.consensus + ->replicate(std::move(batches), opts) .then([](result res) { if (!res) { return false; @@ -827,12 +818,13 @@ inline ss::future replicate_random_batches( model::timeout_clock::duration tout = 1s) { return retry_with_leader( gr, 5, tout, [count, expected_term, c_lvl](raft_node& leader_node) { - auto rdr = random_batches_reader(count); - return random_batches_reader(count).then( - [&leader_node, c_lvl, expected_term](auto rdr) { + auto rdr = random_batches(count); + return random_batches(count).then( + [&leader_node, c_lvl, expected_term]( + chunked_vector batches) { raft::replicate_options opts(c_lvl); return leader_node.consensus - ->replicate(expected_term, std::move(rdr), opts) + ->replicate(expected_term, std::move(batches), opts) .then([](result res) { if (!res) { return false; @@ -846,9 +838,9 @@ inline ss::future replicate_random_batches( /** * Makes compactible batches, having one record per batch */ -inline model::record_batch_reader +inline chunked_vector make_compactible_batches(int keys, size_t batches, model::timestamp ts) { - ss::circular_buffer ret; + chunked_vector ret; for (size_t b = 0; b < batches; b++) { int k = random_generators::get_int(0, keys); storage::record_batch_builder builder( @@ -866,7 +858,7 @@ make_compactible_batches(int keys, size_t batches, model::timestamp ts) { b.header().first_timestamp = ts; b.header().max_timestamp = ts; } - return model::make_memory_record_batch_reader(std::move(ret)); + return ret; } inline ss::future replicate_compactible_batches( diff --git a/src/v/raft/tests/stm_test_fixture.h b/src/v/raft/tests/stm_test_fixture.h index 027b042122327..01cc330a9de7f 100644 --- a/src/v/raft/tests/stm_test_fixture.h +++ b/src/v/raft/tests/stm_test_fixture.h @@ -178,7 +178,7 @@ struct state_machine_fixture : raft_fixture { [b = std::move(builder).build()]( raft_node_instance& leader_node) mutable { return leader_node.raft()->replicate( - model::make_memory_record_batch_reader(b.share()), + b.share(), raft::replicate_options(raft::consistency_level::quorum_ack)); }); } diff --git a/src/v/transform/rpc/service.cc b/src/v/transform/rpc/service.cc index 6827d5202b016..df1f197cc90af 100644 --- a/src/v/transform/rpc/service.cc +++ b/src/v/transform/rpc/service.cc @@ -154,15 +154,15 @@ ss::future> local_service::produce( co_return cluster::errc::invalid_request; } } - auto rdr = model::make_foreign_fragmented_memory_record_batch_reader( - std::move(batches)); // TODO: schema validation co_return co_await _partition_manager->invoke_on_shard( *shard, ntp, - [timeout, r = std::move(rdr)](kafka::partition_proxy* partition) mutable { + [timeout, + batches = chunked_vector(std::move(batches))]( + kafka::partition_proxy* partition) mutable { return partition - ->replicate(std::move(r), make_replicate_options(timeout)) + ->replicate(std::move(batches), make_replicate_options(timeout)) .then( [](result r) -> result { diff --git a/src/v/transform/rpc/tests/transform_rpc_test.cc b/src/v/transform/rpc/tests/transform_rpc_test.cc index 0debdc16ec42c..e74bd6efafa8f 100644 --- a/src/v/transform/rpc/tests/transform_rpc_test.cc +++ b/src/v/transform/rpc/tests/transform_rpc_test.cc @@ -623,9 +623,8 @@ class fake_partition_manager : public partition_manager { } ss::future> replicate( - model::record_batch_reader rdr, raft::replicate_options) final { - auto batches = co_await model::consume_reader_to_memory( - std::move(rdr), model::no_timeout); + chunked_vector batches, + raft::replicate_options) final { auto offset = latest_offset(); for (const auto& batch : batches) { auto b = batch.copy(); @@ -637,11 +636,15 @@ class fake_partition_manager : public partition_manager { raft::replicate_stages replicate( model::batch_identity, - model::record_batch_reader&&, + model::record_batch, raft::replicate_options) final { throw std::runtime_error("unimplemented"); } + ss::future> + replicate(model::record_batch, raft::replicate_options) final { + throw std::runtime_error("unimplemented"); + } result get_partition_info() const override { throw std::runtime_error("unimplemented"); }