Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fix bounds check in offset range size method #24880

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/v/model/offset_interval.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,27 @@ class bounded_offset_interval {
unchecked(model::offset min, model::offset max) noexcept {
return {min, max};
}

// Returns std::nullopt if the range is invalid (e.g. invalid start,
// invalid end, or end > start).
static std::optional<bounded_offset_interval>
optional(model::offset min, model::offset max) {
if (min < model::offset(0) || max < model::offset(0) || min > max) {
return std::nullopt;
}
return unchecked(min, max);
}
static bounded_offset_interval
checked(model::offset min, model::offset max) {
if (min < model::offset(0) || max < model::offset(0) || min > max) {
auto ret = optional(min, max);
if (!ret.has_value()) {
throw std::invalid_argument(fmt::format(
"Invalid arguments for constructing a non-empty bounded offset "
"interval: min({}) <= max({})",
min,
max));
}

return {min, max};
return ret.value();
}

inline bool overlaps(const bounded_offset_interval& other) const noexcept {
Expand Down
28 changes: 26 additions & 2 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "model/adl_serde.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "model/offset_interval.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "model/timestamp.h"
Expand Down Expand Up @@ -1981,12 +1982,23 @@ ss::future<size_t> disk_log_impl::get_file_offset(
ss::future<std::optional<log::offset_range_size_result_t>>
disk_log_impl::offset_range_size(
model::offset first, model::offset last, ss::io_priority_class io_priority) {
auto log_offsets = offsets();
vlog(
stlog.debug,
"Offset range size, first: {}, last: {}, lstat: {}",
first,
last,
offsets());
log_offsets);
auto log_interval = model::bounded_offset_interval::optional(
log_offsets.start_offset, log_offsets.committed_offset);
if (!log_interval.has_value()) {
vlog(stlog.debug, "Log is empty, returning early");
co_return std::nullopt;
}
if (!log_interval->contains(first) || !log_interval->contains(last)) {
vlog(stlog.debug, "Log does not include entire range");
co_return std::nullopt;
}

// build the collection
const auto segments = [&] {
Expand Down Expand Up @@ -2141,13 +2153,25 @@ disk_log_impl::offset_range_size(
model::offset first,
offset_range_size_requirements_t target,
ss::io_priority_class io_priority) {
auto log_offsets = offsets();
vlog(
stlog.debug,
"Offset range size, first: {}, target size: {}/{}, lstat: {}",
first,
target.target_size,
target.min_size,
offsets());
log_offsets);
auto log_interval = model::bounded_offset_interval::optional(
log_offsets.start_offset, log_offsets.committed_offset);
if (!log_interval.has_value()) {
vlog(stlog.debug, "Log is empty, returning early");
co_return std::nullopt;
}
if (!log_interval->contains(first)) {
vlog(stlog.debug, "Log does not include offset {}", first);
co_return std::nullopt;
}

auto base_it = _segs.lower_bound(first);

// Invariant: 'first' offset should be present in the log. If the segment is
Expand Down
59 changes: 59 additions & 0 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4237,6 +4237,60 @@ FIXTURE_TEST(reader_reusability_max_bytes, storage_test_fixture) {
test_case(400000, 300000, false);
}

FIXTURE_TEST(
test_offset_range_size_after_mid_segment_truncation, storage_test_fixture) {
size_t num_segments = 2;
model::offset first_segment_last_offset;
auto cfg = default_log_config(test_dir);
storage::log_manager mgr = make_log_manager(cfg);
info("Configuration: {}", mgr.config());
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });
auto ntp = model::ntp("redpanda", "test-topic", 0);

storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir);

auto log = mgr.manage(std::move(ntp_cfg)).get();
for (size_t i = 0; i < num_segments; i++) {
append_random_batches(
log,
10,
model::term_id(0),
custom_ts_batch_generator(model::timestamp::now()));
if (first_segment_last_offset == model::offset{}) {
first_segment_last_offset = log->offsets().dirty_offset;
}
log->force_roll(ss::default_priority_class()).get();
}

// Prefix truncate such that offset 1 is the new log start.
log
->truncate_prefix(storage::truncate_prefix_config(
model::offset(1), ss::default_priority_class()))
.get();

// Run size queries on ranges that don't exist in the log, but whose range
// is still included in a segment.

BOOST_CHECK(
log
->offset_range_size(
model::offset(0), model::offset(1), ss::default_priority_class())
.get()
== std::nullopt);

BOOST_CHECK(
log
->offset_range_size(
model::offset(0),
storage::log::offset_range_size_requirements_t{
.target_size = 1,
.min_size = 0,
},
ss::default_priority_class())
.get()
== std::nullopt);
}

FIXTURE_TEST(test_offset_range_size, storage_test_fixture) {
#ifdef NDEBUG
size_t num_test_cases = 5000;
Expand Down Expand Up @@ -4907,6 +4961,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {
BOOST_REQUIRE(result->on_disk_size >= target_size);
}

info("Prefix truncating");
auto new_start_offset = model::next_offset(first_segment_last_offset);
log
->truncate_prefix(storage::truncate_prefix_config(
Expand All @@ -4918,6 +4973,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {

// Check that out of range access triggers exception.

info("Checking for null on out-of-range");
BOOST_REQUIRE(
log
->offset_range_size(
Expand Down Expand Up @@ -4959,6 +5015,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {
.get()
== std::nullopt);

info("Checking the last batch");
// Check that the last batch can be measured independently
auto res = log
->offset_range_size(
Expand All @@ -4980,6 +5037,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {
size_t tail_length = 5;

for (size_t i = 0; i < tail_length; i++) {
info("Checking i = {}", i);
auto ix_batch = c_summaries.size() - 1 - i;
res = log
->offset_range_size(
Expand All @@ -4997,6 +5055,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {
}

// Check that the min_size is respected
info("Checking the back segment");
BOOST_REQUIRE(
log
->offset_range_size(
Expand Down
Loading