Skip to content

Commit

Permalink
Merge pull request #24880 from andrwng/storage-offset-range-size-boun…
Browse files Browse the repository at this point in the history
…ds-fix

storage: fix bounds check in offset range size method
  • Loading branch information
andrwng authored Jan 22, 2025
2 parents 88fb0d2 + 69e4666 commit ca0b085
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 5 deletions.
15 changes: 12 additions & 3 deletions src/v/model/offset_interval.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,27 @@ class bounded_offset_interval {
unchecked(model::offset min, model::offset max) noexcept {
return {min, max};
}

// Returns std::nullopt if the range is invalid (e.g. invalid start,
// invalid end, or end > start).
static std::optional<bounded_offset_interval>
optional(model::offset min, model::offset max) {
if (min < model::offset(0) || max < model::offset(0) || min > max) {
return std::nullopt;
}
return unchecked(min, max);
}
static bounded_offset_interval
checked(model::offset min, model::offset max) {
if (min < model::offset(0) || max < model::offset(0) || min > max) {
auto ret = optional(min, max);
if (!ret.has_value()) {
throw std::invalid_argument(fmt::format(
"Invalid arguments for constructing a non-empty bounded offset "
"interval: min({}) <= max({})",
min,
max));
}

return {min, max};
return ret.value();
}

inline bool overlaps(const bounded_offset_interval& other) const noexcept {
Expand Down
28 changes: 26 additions & 2 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "model/adl_serde.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "model/offset_interval.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "model/timestamp.h"
Expand Down Expand Up @@ -2097,12 +2098,23 @@ ss::future<size_t> disk_log_impl::get_file_offset(
ss::future<std::optional<log::offset_range_size_result_t>>
disk_log_impl::offset_range_size(
model::offset first, model::offset last, ss::io_priority_class io_priority) {
auto log_offsets = offsets();
vlog(
stlog.debug,
"Offset range size, first: {}, last: {}, lstat: {}",
first,
last,
offsets());
log_offsets);
auto log_interval = model::bounded_offset_interval::optional(
log_offsets.start_offset, log_offsets.committed_offset);
if (!log_interval.has_value()) {
vlog(stlog.debug, "Log is empty, returning early");
co_return std::nullopt;
}
if (!log_interval->contains(first) || !log_interval->contains(last)) {
vlog(stlog.debug, "Log does not include entire range");
co_return std::nullopt;
}

// build the collection
const auto segments = [&] {
Expand Down Expand Up @@ -2257,13 +2269,25 @@ disk_log_impl::offset_range_size(
model::offset first,
offset_range_size_requirements_t target,
ss::io_priority_class io_priority) {
auto log_offsets = offsets();
vlog(
stlog.debug,
"Offset range size, first: {}, target size: {}/{}, lstat: {}",
first,
target.target_size,
target.min_size,
offsets());
log_offsets);
auto log_interval = model::bounded_offset_interval::optional(
log_offsets.start_offset, log_offsets.committed_offset);
if (!log_interval.has_value()) {
vlog(stlog.debug, "Log is empty, returning early");
co_return std::nullopt;
}
if (!log_interval->contains(first)) {
vlog(stlog.debug, "Log does not include offset {}", first);
co_return std::nullopt;
}

auto base_it = _segs.lower_bound(first);

// Invariant: 'first' offset should be present in the log. If the segment is
Expand Down
59 changes: 59 additions & 0 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4237,6 +4237,60 @@ FIXTURE_TEST(reader_reusability_max_bytes, storage_test_fixture) {
test_case(400000, 300000, false);
}

FIXTURE_TEST(
test_offset_range_size_after_mid_segment_truncation, storage_test_fixture) {
size_t num_segments = 2;
model::offset first_segment_last_offset;
auto cfg = default_log_config(test_dir);
storage::log_manager mgr = make_log_manager(cfg);
info("Configuration: {}", mgr.config());
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });
auto ntp = model::ntp("redpanda", "test-topic", 0);

storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir);

auto log = mgr.manage(std::move(ntp_cfg)).get();
for (size_t i = 0; i < num_segments; i++) {
append_random_batches(
log,
10,
model::term_id(0),
custom_ts_batch_generator(model::timestamp::now()));
if (first_segment_last_offset == model::offset{}) {
first_segment_last_offset = log->offsets().dirty_offset;
}
log->force_roll(ss::default_priority_class()).get();
}

// Prefix truncate such that offset 1 is the new log start.
log
->truncate_prefix(storage::truncate_prefix_config(
model::offset(1), ss::default_priority_class()))
.get();

// Run size queries on ranges that don't exist in the log, but whose range
// is still included in a segment.

BOOST_CHECK(
log
->offset_range_size(
model::offset(0), model::offset(1), ss::default_priority_class())
.get()
== std::nullopt);

BOOST_CHECK(
log
->offset_range_size(
model::offset(0),
storage::log::offset_range_size_requirements_t{
.target_size = 1,
.min_size = 0,
},
ss::default_priority_class())
.get()
== std::nullopt);
}

FIXTURE_TEST(test_offset_range_size, storage_test_fixture) {
#ifdef NDEBUG
size_t num_test_cases = 5000;
Expand Down Expand Up @@ -4907,6 +4961,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {
BOOST_REQUIRE(result->on_disk_size >= target_size);
}

info("Prefix truncating");
auto new_start_offset = model::next_offset(first_segment_last_offset);
log
->truncate_prefix(storage::truncate_prefix_config(
Expand All @@ -4918,6 +4973,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {

// Check that out of range access triggers exception.

info("Checking for null on out-of-range");
BOOST_REQUIRE(
log
->offset_range_size(
Expand Down Expand Up @@ -4959,6 +5015,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {
.get()
== std::nullopt);

info("Checking the last batch");
// Check that the last batch can be measured independently
auto res = log
->offset_range_size(
Expand All @@ -4980,6 +5037,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {
size_t tail_length = 5;

for (size_t i = 0; i < tail_length; i++) {
info("Checking i = {}", i);
auto ix_batch = c_summaries.size() - 1 - i;
res = log
->offset_range_size(
Expand All @@ -4997,6 +5055,7 @@ FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) {
}

// Check that the min_size is respected
info("Checking the back segment");
BOOST_REQUIRE(
log
->offset_range_size(
Expand Down

0 comments on commit ca0b085

Please sign in to comment.