Skip to content

Commit

Permalink
Fix corrupted wal number when predecessor wal corrupts + minor cleanup (
Browse files Browse the repository at this point in the history
#13359)

Summary:
**Context/Summary:**

02b4197 recently added the ability to detect WAL hole presents in the predecessor WAL. It forgot to update the corrupted wal number to point to the predecessor WAL in that corruption case. This PR fixed it.

As a bonus, this PR also (1) fixed the `FragmentBufferedReader()` constructor API to expose less parameters as they are never explicitly passed in in the codebase (2) a INFO log wording (3) a parameter naming typo (4) the reporter naming

Pull Request resolved: #13359

Test Plan:
1. Manual printing to ensure the corrupted wal number is set to the right number
2. Existing UTs

Reviewed By: jowlyzhang

Differential Revision: D69068089

Pulled By: hx235

fbshipit-source-id: f7f8a887cded2d3a26cf9982f5d1d1ab6a78e9e1
  • Loading branch information
hx235 authored and facebook-github-bot committed Feb 14, 2025
1 parent f6b2cdd commit affcad0
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 58 deletions.
40 changes: 25 additions & 15 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,21 @@ class Directories {
std::unique_ptr<FSDirectory> wal_dir_;
};

struct DBOpenLogReporter : public log::Reader::Reporter {
struct DBOpenLogRecordReadReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
bool* old_log_record;
void Corruption(size_t bytes, const Status& s) override;
void Corruption(size_t bytes, const Status& s,
uint64_t log_number = kMaxSequenceNumber) override;

void OldLogRecord(size_t bytes) override;

uint64_t GetCorruptedLogNumber() const { return corrupted_log_number_; }

private:
uint64_t corrupted_log_number_ = kMaxSequenceNumber;
};

// While DB is the public interface of RocksDB, and DBImpl is the actual
Expand Down Expand Up @@ -2067,21 +2073,25 @@ class DBImpl : public DB {

void SetupLogFileProcessing(uint64_t wal_number);

Status InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
Status InitializeLogReader(uint64_t wal_number, bool is_retry,
std::string& fname,

bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info,
bool* const old_log_record, Status* const reporter_status,
DBOpenLogReporter* reporter, std::unique_ptr<log::Reader>& reader);
bool stop_replay_for_corruption,
uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info,
bool* const old_log_record,
Status* const reporter_status,
DBOpenLogRecordReadReporter* reporter,
std::unique_ptr<log::Reader>& reader);
Status ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
const std::function<void()>& logFileDropped,
DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum,
SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);

Status InitializeWriteBatchForLogRecord(
Expand All @@ -2106,9 +2116,9 @@ class DBImpl : public DB {

Status HandleNonOkStatusOrOldLogRecord(
uint64_t wal_number, SequenceNumber const* const next_sequence,
Status log_read_status, bool* old_log_record,
bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found);
Status status, const DBOpenLogRecordReadReporter& reporter,
bool* old_log_record, bool* stop_replay_for_corruption,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found);

Status UpdatePredecessorWALInfo(uint64_t wal_number,
const SequenceNumber last_seqno_observed,
Expand Down
33 changes: 21 additions & 12 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1106,16 +1106,18 @@ bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
return true;
}

void DBOpenLogReporter::Corruption(size_t bytes, const Status& s) {
void DBOpenLogRecordReadReporter::Corruption(size_t bytes, const Status& s,
uint64_t log_number) {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(status == nullptr ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (status != nullptr && status->ok()) {
*status = s;
corrupted_log_number_ = log_number;
}
}

void DBOpenLogReporter::OldLogRecord(size_t bytes) {
void DBOpenLogRecordReadReporter::OldLogRecord(size_t bytes) {
if (old_log_record != nullptr) {
*old_log_record = true;
}
Expand Down Expand Up @@ -1229,7 +1231,7 @@ Status DBImpl::ProcessLogFile(
Status status;
bool old_log_record = false;

DBOpenLogReporter reporter;
DBOpenLogRecordReadReporter reporter;
std::unique_ptr<log::Reader> reader;

std::string fname =
Expand Down Expand Up @@ -1323,7 +1325,7 @@ Status DBImpl::ProcessLogFile(
}

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number,
"Recovered to log #%" PRIu64 " next seq #%" PRIu64, wal_number,
*next_sequence);

if (status.ok()) {
Expand All @@ -1333,7 +1335,7 @@ Status DBImpl::ProcessLogFile(

if (!status.ok() || old_log_record) {
status = HandleNonOkStatusOrOldLogRecord(
wal_number, next_sequence, status, &old_log_record,
wal_number, next_sequence, status, reporter, &old_log_record,
stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found);
}

Expand All @@ -1357,7 +1359,7 @@ Status DBImpl::InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record,
Status* const reporter_status, DBOpenLogReporter* reporter,
Status* const reporter_status, DBOpenLogRecordReadReporter* reporter,
std::unique_ptr<log::Reader>& reader) {
assert(old_log_record);
assert(reporter_status);
Expand Down Expand Up @@ -1408,10 +1410,11 @@ Status DBImpl::ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
const std::function<void()>& logFileDropped,
DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum,
SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
assert(reporter);
assert(last_seqno_observed);
Expand Down Expand Up @@ -1607,7 +1610,8 @@ Status DBImpl::MaybeWriteLevel0TableForRecovery(

Status DBImpl::HandleNonOkStatusOrOldLogRecord(
uint64_t wal_number, SequenceNumber const* const next_sequence,
Status status, bool* old_log_record, bool* stop_replay_for_corruption,
Status status, const DBOpenLogRecordReadReporter& reporter,
bool* old_log_record, bool* stop_replay_for_corruption,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found) {
assert(!status.ok() || *old_log_record);

Expand Down Expand Up @@ -1641,7 +1645,12 @@ Status DBImpl::HandleNonOkStatusOrOldLogRecord(
// We should ignore the error but not continue replaying
*old_log_record = false;
*stop_replay_for_corruption = true;
*corrupted_wal_number = wal_number;
// TODO(hx235): have a single source of corrupted WAL number once we
// consolidate the statuses
uint64_t reporter_corrupted_wal_number = reporter.GetCorruptedLogNumber();
*corrupted_wal_number = reporter_corrupted_wal_number != kMaxSequenceNumber
? reporter_corrupted_wal_number
: wal_number;
if (corrupted_wal_found != nullptr) {
*corrupted_wal_found = true;
}
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class LogReaderContainer {
Logger* info_log;
std::string fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
void Corruption(size_t bytes, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""),
fname.c_str(), static_cast<int>(bytes),
Expand Down
3 changes: 2 additions & 1 deletion db/experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ Status GetFileChecksumsFromCurrentManifest(FileSystem* fs,

struct LogReporter : public log::Reader::Reporter {
Status* status_ptr;
void Corruption(size_t /*bytes*/, const Status& st) override {
void Corruption(size_t /*bytes*/, const Status& st,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
if (status_ptr->ok()) {
*status_ptr = st;
}
Expand Down
22 changes: 14 additions & 8 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,16 @@ void Reader::MaybeVerifyPredecessorWALInfo(
if (recorded_predecessor_log_number >= min_wal_number_to_keep_) {
std::string reason = "Missing WAL of log number " +
std::to_string(recorded_predecessor_log_number);
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
}
} else {
if (observed_predecessor_wal_info_.GetLogNumber() !=
recorded_predecessor_log_number) {
std::string reason = "Missing WAL of log number " +
std::to_string(recorded_predecessor_log_number);
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
} else if (observed_predecessor_wal_info_.GetLastSeqnoRecorded() !=
recorded_predecessor_wal_info.GetLastSeqnoRecorded()) {
std::string reason =
Expand All @@ -392,7 +394,8 @@ void Reader::MaybeVerifyPredecessorWALInfo(
std::to_string(
observed_predecessor_wal_info_.GetLastSeqnoRecorded()) +
". (Last sequence number equal to 0 indicates no WAL records)";
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
} else if (observed_predecessor_wal_info_.GetSizeBytes() !=
recorded_predecessor_wal_info.GetSizeBytes()) {
std::string reason =
Expand All @@ -402,7 +405,8 @@ void Reader::MaybeVerifyPredecessorWALInfo(
" bytes. Observed " +
std::to_string(observed_predecessor_wal_info_.GetSizeBytes()) +
" bytes.";
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
}
}
}
Expand Down Expand Up @@ -483,13 +487,15 @@ void Reader::UnmarkEOFInternal() {
}
}

void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
void Reader::ReportCorruption(size_t bytes, const char* reason,
uint64_t log_number) {
ReportDrop(bytes, Status::Corruption(reason), log_number);
}

void Reader::ReportDrop(size_t bytes, const Status& reason) {
void Reader::ReportDrop(size_t bytes, const Status& reason,
uint64_t log_number) {
if (reporter_ != nullptr) {
reporter_->Corruption(bytes, reason);
reporter_->Corruption(bytes, reason, log_number);
}
}

Expand Down
26 changes: 13 additions & 13 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class Reader {

// Some corruption was detected. "size" is the approximate number
// of bytes dropped due to the corruption.
virtual void Corruption(size_t bytes, const Status& status) = 0;
virtual void Corruption(size_t bytes, const Status& status,
uint64_t log_number = kMaxSequenceNumber) = 0;

virtual void OldLogRecord(size_t /*bytes*/) {}
};
Expand Down Expand Up @@ -220,8 +221,10 @@ class Reader {

// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
void ReportCorruption(size_t bytes, const char* reason,
uint64_t log_number = kMaxSequenceNumber);
void ReportDrop(size_t bytes, const Status& reason,
uint64_t log_number = kMaxSequenceNumber);
void ReportOldLogRecord(size_t bytes);

void InitCompression(const CompressionTypeRecord& compression_record);
Expand All @@ -236,17 +239,14 @@ class Reader {

class FragmentBufferedReader : public Reader {
public:
FragmentBufferedReader(
std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
bool checksum, uint64_t log_num, bool verify_and_track_wals = false,
bool stop_replay_for_corruption = false,
uint64_t min_wal_number_to_keep = std::numeric_limits<uint64_t>::max(),
const PredecessorWALInfo& observed_predecessor_wal_info =
PredecessorWALInfo())
FragmentBufferedReader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file,
Reporter* reporter, bool checksum, uint64_t log_num)
: Reader(info_log, std::move(_file), reporter, checksum, log_num,
verify_and_track_wals, stop_replay_for_corruption,
min_wal_number_to_keep, observed_predecessor_wal_info),
false /*verify_and_track_wals*/,
false /*stop_replay_for_corruption*/,
std::numeric_limits<uint64_t>::max() /*min_wal_number_to_keep*/,
PredecessorWALInfo() /*observed_predecessor_wal_info*/),
fragments_(),
in_fragmented_record_(false) {}
~FragmentBufferedReader() override {}
Expand Down
6 changes: 4 additions & 2 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class LogTest
std::string message_;

ReportCollector() : dropped_bytes_(0) {}
void Corruption(size_t bytes, const Status& status) override {
void Corruption(size_t bytes, const Status& status,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
dropped_bytes_ += bytes;
message_.append(status.ToString());
}
Expand Down Expand Up @@ -825,7 +826,8 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
std::string message_;

ReportCollector() : dropped_bytes_(0) {}
void Corruption(size_t bytes, const Status& status) override {
void Corruption(size_t bytes, const Status& status,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
dropped_bytes_ += bytes;
message_.append(status.ToString());
}
Expand Down
6 changes: 4 additions & 2 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,12 @@ class Repairer {
Env* env;
std::shared_ptr<Logger> info_log;
uint64_t lognum;
void Corruption(size_t bytes, const Status& s) override {
void Corruption(size_t bytes, const Status& s,
uint64_t log_number = kMaxSequenceNumber) override {
// We print error messages for corruption, but continue repairing.
ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s",
lognum, static_cast<int>(bytes), s.ToString().c_str());
log_number == kMaxSequenceNumber ? lognum : log_number,
static_cast<int>(bytes), s.ToString().c_str());
}
};

Expand Down
3 changes: 2 additions & 1 deletion db/transaction_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
void Corruption(size_t bytes, const Status& s) override {
void Corruption(size_t bytes, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
ROCKS_LOG_ERROR(info_log, "dropping %" ROCKSDB_PRIszt " bytes; %s", bytes,
s.ToString().c_str());
}
Expand Down
3 changes: 2 additions & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,8 @@ class VersionSet {

struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t /*bytes*/, const Status& s) override {
void Corruption(size_t /*bytes*/, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
if (status->ok()) {
*status = s;
}
Expand Down
3 changes: 2 additions & 1 deletion db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ Status WalManager::ReadFirstLine(const std::string& fname,

Status* status;
bool ignore_error; // true if db_options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
void Corruption(size_t bytes, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
(this->ignore_error ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
Expand Down
3 changes: 2 additions & 1 deletion tools/ldb_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2768,7 +2768,8 @@ void ChangeCompactionStyleCommand::DoCommand() {
namespace {

struct StdErrReporter : public log::Reader::Reporter {
void Corruption(size_t /*bytes*/, const Status& s) override {
void Corruption(size_t /*bytes*/, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
std::cerr << "Corruption detected in log file " << s.ToString() << "\n";
}
};
Expand Down

0 comments on commit affcad0

Please sign in to comment.