Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix duplicate entries in be_logs; Add reset_delvec in script; Add BE id in error message when query failed (backport #51204) #51351

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions be/src/common/greplog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "gutil/strings/substitute.h"
#include "hs/hs_compile.h"
#include "hs/hs_runtime.h"
#include "service/backend_options.h"
#include "util/defer_op.h"

using namespace std;
Expand All @@ -33,16 +34,18 @@ namespace starrocks {
static std::vector<string> list_log_files_in_dir(const string& log_dir, char level) {
std::vector<string> files;
// if level in WARNING, ERROR, FATAL, use logging logs, else use info logs
const std::string pattern = string("WEF").find(level) == string::npos ? "be.INFO.log." : "be.WARNING.log.";
const std::string process = BackendOptions::is_cn() ? "cn" : "be";
const std::string pattern = process + (string("WEF").find(level) == string::npos ? ".INFO.log." : ".WARNING.log.");
for (const auto& entry : filesystem::directory_iterator(log_dir)) {
if (entry.is_regular_file()) {
auto name = entry.path().filename().string();
if (name.length() > pattern.length() && name.substr(0, pattern.length()) == pattern) {
if (name.length() > pattern.length() && name.find(pattern) != string::npos) {
files.push_back(entry.path().string());
}
}
}
std::sort(files.begin(), files.end(), std::greater<string>());
LOG_IF(WARNING, files.empty()) << "list_log_files_in_dir failed, no log files in " << log_dir;
return files;
}

Expand Down Expand Up @@ -180,7 +183,7 @@ Status grep_log_single_file(const string& path, int64_t start_ts, int64_t end_ts
ctx.line_len = read;
if (database == nullptr) {
// no pattern, add all lines
scan_by_line_handler(0, 0, 0, 0, &ctx);
scan_by_line_handler(0, 0, read, 0, &ctx);
} else {
if (hs_scan(database, line, read, 0, scratch, scan_by_line_handler, &ctx) != HS_SUCCESS) {
break;
Expand All @@ -195,6 +198,7 @@ Status grep_log_single_file(const string& path, int64_t start_ts, int64_t end_ts

Status grep_log(int64_t start_ts, int64_t end_ts, char level, const std::string& pattern, size_t limit,
std::deque<GrepLogEntry>& entries) {
level = std::toupper(level);
const string log_dir = config::sys_log_dir;
if (log_dir.empty()) {
return Status::InternalError(strings::Substitute("grep log failed $0 is empty", log_dir));
Expand All @@ -206,7 +210,8 @@ Status grep_log(int64_t start_ts, int64_t end_ts, char level, const std::string&
hs_database_t* database = nullptr;
if (!pattern.empty()) {
hs_compile_error_t* compile_err;
if (hs_compile(pattern.c_str(), 0, HS_MODE_BLOCK, NULL, &database, &compile_err) != HS_SUCCESS) {
if (hs_compile(pattern.c_str(), HS_FLAG_SINGLEMATCH, HS_MODE_BLOCK, nullptr, &database, &compile_err) !=
HS_SUCCESS) {
hs_free_compile_error(compile_err);
return Status::InternalError(
strings::Substitute("grep log failed compile pattern $0 failed $1", pattern, compile_err->message));
Expand Down Expand Up @@ -253,10 +258,11 @@ Status grep_log(int64_t start_ts, int64_t end_ts, char level, const std::string&
return Status::OK();
}

std::string grep_log_as_string(int64_t start_ts, int64_t end_ts, char level, const std::string& pattern, size_t limit) {
std::string grep_log_as_string(int64_t start_ts, int64_t end_ts, const std::string& level, const std::string& pattern,
size_t limit) {
std::ostringstream ss;
std::deque<GrepLogEntry> entries;
auto st = grep_log(start_ts, end_ts, level, pattern, limit, entries);
auto st = grep_log(start_ts, end_ts, level[0], pattern, limit, entries);
if (!st.ok()) {
ss << strings::Substitute("grep log failed $0 start_ts:$1 end_ts:$2 level:$3 pattern:$4 limit:$5\n",
st.to_string(), start_ts, end_ts, level, pattern, limit);
Expand Down
3 changes: 2 additions & 1 deletion be/src/common/greplog.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Status grep_log(int64_t start_ts, int64_t end_ts, char level, const std::string&
* Grep log file and return all line as whole string, parameters are same as grep_log
* @return log string
*/
std::string grep_log_as_string(int64_t start_ts, int64_t end_ts, char level, const std::string& pattern, size_t limit);
std::string grep_log_as_string(int64_t start_ts, int64_t end_ts, const std::string& level, const std::string& pattern,
size_t limit);

} // namespace starrocks
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <memory>

#include "agent/master_info.h"
#include "exec/workgroup/work_group.h"
#include "gutil/strings/substitute.h"
#include "runtime/current_thread.h"
Expand Down Expand Up @@ -135,6 +136,9 @@ void GlobalDriverExecutor::_worker_thread() {
}

if (!status.ok()) {
auto o_id = get_backend_id();
int64_t be_id = o_id.has_value() ? o_id.value() : -1;
status = status.clone_and_append(fmt::format("BE:{}", be_id));
LOG(WARNING) << "[Driver] Process error, query_id=" << print_id(driver->query_ctx()->query_id())
<< ", instance_id=" << print_id(driver->fragment_ctx()->fragment_instance_id())
<< ", status=" << status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Status SchemaBeLogsScanner::start(RuntimeState* state) {
if (_param->log_end_ts > 0) {
end_ts = _param->log_end_ts;
}
string level;
string level = "I";
string pattern;
if (_param->log_level != nullptr) {
level = *_param->log_level;
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/greplog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void GrepLogAction::handle(HttpRequest* req) {
return;
}

auto ret = grep_log_as_string(start_ts, end_ts, std::toupper(level[0]), pattern, limit);
auto ret = grep_log_as_string(start_ts, end_ts, level, pattern, limit);

HttpChannel::send_reply(req, HttpStatus::OK, ret);
}
Expand Down
12 changes: 12 additions & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "io/io_profiler.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "storage/del_vector.h"
#include "storage/storage_engine.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"
Expand Down Expand Up @@ -275,6 +276,16 @@ class StorageEngineRef {
}
}

// this method is specifically used to recover "no delete vector found" error caused by corrupt pk tablet metadata
static std::string reset_delvec(int64_t tablet_id, int64_t segment_id, int64_t version) {
auto tablet = get_tablet(tablet_id);
RETURN_IF_UNLIKELY_NULL(tablet, "tablet not found");
DelVector dv;
dv.init(version, nullptr, 0);
auto st = TabletMetaManager::set_del_vector(tablet->data_dir()->get_meta(), tablet_id, segment_id, dv);
return st.to_string();
}

static size_t submit_manual_compaction_task_for_table(int64_t table_id, int64_t rowset_size_threshold) {
auto infos = get_tablet_infos(table_id, -1);
for (auto& info : infos) {
Expand Down Expand Up @@ -452,6 +463,7 @@ class StorageEngineRef {
REG_STATIC_METHOD(StorageEngineRef, get_tablet_info);
REG_STATIC_METHOD(StorageEngineRef, get_tablet_infos);
REG_STATIC_METHOD(StorageEngineRef, get_tablet_meta_json);
REG_STATIC_METHOD(StorageEngineRef, reset_delvec);
REG_STATIC_METHOD(StorageEngineRef, get_tablet);
REG_STATIC_METHOD(StorageEngineRef, drop_tablet);
REG_STATIC_METHOD(StorageEngineRef, get_data_dirs);
Expand Down
9 changes: 8 additions & 1 deletion be/src/service/backend_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ std::string BackendOptions::_s_localhost;
std::vector<CIDR> BackendOptions::_s_priority_cidrs;
TBackend BackendOptions::_backend;

bool BackendOptions::init() {
bool BackendOptions::_is_cn = false;

bool BackendOptions::is_cn() {
return _is_cn;
}

bool BackendOptions::init(bool is_cn) {
_is_cn = is_cn;
if (!analyze_priority_cidrs()) {
return false;
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/service/backend_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ class CIDR;

class BackendOptions {
public:
static bool init();
static bool init(bool is_cn);
static std::string get_localhost();
static TBackend get_localBackend();
static void set_localhost(const std::string& host);
static bool is_cn();

private:
static bool analyze_priority_cidrs();
Expand All @@ -42,6 +43,7 @@ class BackendOptions {
static std::string _s_localhost;
static std::vector<CIDR> _s_priority_cidrs;
static TBackend _backend;
static bool _is_cn;

BackendOptions(const BackendOptions&) = delete;
const BackendOptions& operator=(const BackendOptions&) = delete;
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/starrocks_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ int main(int argc, char** argv) {
EXIT_IF_ERROR(starrocks::JDBCDriverManager::getInstance()->init(std::string(getenv("STARROCKS_HOME")) +
"/lib/jdbc_drivers"));

if (!starrocks::BackendOptions::init()) {
if (!starrocks::BackendOptions::init(as_cn)) {
exit(-1);
}

Expand Down
7 changes: 7 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "fs/fs.h"
#include "gutil/strings/substitute.h"
#include "runtime/runtime_state.h"
#include "script/script.h"
#include "storage/chunk_helper.h"
#include "storage/empty_iterator.h"
#include "storage/kv_store.h"
Expand Down Expand Up @@ -1004,6 +1005,12 @@ void TabletUpdatesTest::test_writeread(bool enable_persistent_index) {
auto rs0 = create_rowset(_tablet, keys);
ASSERT_TRUE(_tablet->rowset_commit(2, rs0).ok());
ASSERT_EQ(2, _tablet->updates()->max_version());

string o;
ASSERT_TRUE(execute_script(fmt::format("StorageEngine.reset_delvec({}, {}, 2)", _tablet->tablet_id(), 0), o).ok());
ASSERT_TRUE(execute_script("System.print(ExecEnv.grep_log_as_string(0,0,\"I\",\"tablet_manager\",1))", o).ok());
LOG(INFO) << "grep log: " << o;

auto rs1 = create_rowset(_tablet, keys);
ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok());
ASSERT_EQ(3, _tablet->updates()->max_version());
Expand Down
Loading