Skip to content

Commit

Permalink
Merge branch 'master' into core-4967
Browse files Browse the repository at this point in the history
  • Loading branch information
yagagagaga authored Jan 13, 2025
2 parents 5c746b0 + ca6b229 commit dae5795
Show file tree
Hide file tree
Showing 1,256 changed files with 13,975 additions and 6,683 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
sh_checker_comment: true
sh_checker_exclude: .git .github ^docker ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples
sh_checker_exclude: .git .github ^docker/compilation ^docker/runtime ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples

preparation:
name: "Clang Tidy Preparation"
Expand Down
45 changes: 41 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand Down Expand Up @@ -481,6 +482,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down Expand Up @@ -1657,11 +1659,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r
.tag("tablet_id", drop_tablet_req.tablet_id);
return;
});
// 1. erase lru from tablet mgr
// TODO(dx) clean tablet file cache
// get tablet's info(such as cachekey, tablet id, rsid)
MonotonicStopWatch watch;
watch.start();
auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
std::ostringstream rowset_ids_stream;
bool found = false;
for (auto& weak_tablet : weak_tablets) {
auto tablet = weak_tablet.lock();
if (tablet == nullptr) {
continue;
}
if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
continue;
}
found = true;
auto clean_rowsets = tablet->get_snapshot_rowset(true);
// Get first 10 rowset IDs as comma-separated string, just for log
int count = 0;
for (const auto& rowset : clean_rowsets) {
if (count >= 10) break;
if (count > 0) {
rowset_ids_stream << ",";
}
rowset_ids_stream << rowset->rowset_id().to_string();
count++;
}

CloudTablet::recycle_cached_data(std::move(clean_rowsets));
break;
}

if (!found) {
LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
<< ", cost " << static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
// 2. gen clean file cache task
LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
<< " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
<< static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
return;
}

Expand Down
45 changes: 11 additions & 34 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ static std::string debug_info(const Request& req) {
} else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) {
return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(),
req.tablet_id(), req.lock_id());
} else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
return fmt::format(" tablet_id={}", req.tablet_id());
} else {
static_assert(!sizeof(Request));
}
Expand Down Expand Up @@ -373,7 +375,11 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
if (op_name == "get delete bitmap") {
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
} else {
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
}
cntl.set_max_retry(kBrpcRetryTimes);
res->Clear();
(stub.get()->*method)(&cntl, &req, res, nullptr);
Expand Down Expand Up @@ -714,41 +720,12 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_

VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();

int retry_times = 0;
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
auto start = std::chrono::high_resolution_clock::now();
while (true) {
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(proxy->get(&stub));
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
brpc::Controller cntl;
cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
res.Clear();
stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
if (cntl.Failed()) [[unlikely]] {
LOG_INFO("failed to get delete bitmap")
.tag("reason", cntl.ErrorText())
.tag("tablet_id", tablet->tablet_id())
.tag("partition_id", tablet->partition_id())
.tag("tried", retry_times);
proxy->set_unhealthy();
} else {
break;
}

if (++retry_times > config::delete_bitmap_rpc_retry_times) {
if (cntl.Failed()) {
return Status::RpcError("failed to get delete bitmap, tablet={} err={}",
tablet->tablet_id(), cntl.ErrorText());
}
break;
}
}
auto st = retry_rpc("get delete bitmap", req, &res, &MetaService_Stub::get_delete_bitmap);
auto end = std::chrono::high_resolution_clock::now();
if (st.code() == ErrorCode::THRIFT_RPC_ERROR) {
return st;
}

if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get delete bitmap: {}", res.status().msg());
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ Status CloudRowsetBuilder::init() {
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema());
RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema()));

RowsetWriterContext context;
context.txn_id = _req.txn_id;
Expand Down
7 changes: 7 additions & 0 deletions be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "cloud/cloud_stream_load_executor.h"

#include <bvar/bvar.h>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
Expand All @@ -27,6 +29,10 @@

namespace doris {

bvar::Adder<uint64_t> stream_load_commit_retry_counter;
bvar::Window<bvar::Adder<uint64_t>> stream_load_commit_retry_counter_minute(
&stream_load_commit_retry_counter, 60);

enum class TxnOpParamType : int {
ILLEGAL,
WITH_TXN_ID,
Expand Down Expand Up @@ -114,6 +120,7 @@ Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
.tag("retry_times", retry_times)
.error(st);
retry_times++;
stream_load_commit_retry_counter << 1;
}
return st;
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,

uint64_t CloudTablet::delete_expired_stale_rowsets() {
std::vector<RowsetSharedPtr> expired_rowsets;
// ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
std::vector<RowsetSharedPtr> stale_rowsets;
int64_t expired_stale_sweep_endtime =
::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
std::vector<std::string> version_to_delete;
Expand All @@ -409,6 +411,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
stale_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
Expand Down Expand Up @@ -456,7 +459,8 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset

if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
if (rs.use_count() >= 1) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ class CloudTablet final : public BaseTablet {

void build_tablet_report_info(TTabletInfo* tablet_info);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

Status sync_if_not_running();

CloudStorageEngine& _engine;
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ DEFINE_String(ssl_private_key_path, "");
// Whether to check authorization
DEFINE_Bool(enable_all_http_auth, "false");
// Number of webserver workers
DEFINE_Int32(webserver_num_workers, "48");
DEFINE_Int32(webserver_num_workers, "128");

DEFINE_Bool(enable_single_replica_load, "true");
// Number of download workers for single replica load
Expand Down Expand Up @@ -915,7 +915,7 @@ DEFINE_String(rpc_load_balancer, "rr");

// a soft limit of string type length, the hard limit is 2GB - 4, but if too long will cause very low performance,
// so we set a soft limit, default is 1MB
DEFINE_mInt32(string_type_length_soft_limit_bytes, "1048576");
DEFINE_Int32(string_type_length_soft_limit_bytes, "1048576");

DEFINE_Validator(string_type_length_soft_limit_bytes,
[](const int config) -> bool { return config > 0 && config <= 2147483643; });
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ DECLARE_mInt64(load_error_log_limit_bytes);

// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
// threads to handle heavy api interface, such as transmit_block etc
DECLARE_Int32(brpc_heavy_work_pool_threads);
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
DECLARE_Int32(brpc_light_work_pool_threads);
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,7 @@ Status insert_int_value(const rapidjson::Value& col, PrimitiveType type,
};

if (pure_doc_value && col.IsArray() && !col.Empty()) {
if (col.IsNumber()) {
RETURN_ERROR_IF_COL_IS_NOT_NUMBER(col[0], type);
if (col[0].IsNumber()) {
T value = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
return Status::OK();
Expand Down
2 changes: 0 additions & 2 deletions be/src/exec/schema_scanner/schema_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include "util/thrift_rpc_helper.h"

namespace doris {
class TDescribeTableParams;
class TDescribeTableResult;
class TDescribeTablesParams;
class TDescribeTablesResult;
class TGetDbsParams;
Expand Down
2 changes: 0 additions & 2 deletions be/src/exec/schema_scanner/schema_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
#include "common/status.h"

namespace doris {
class TDescribeTableParams;
class TDescribeTableResult;
class TDescribeTablesParams;
class TDescribeTablesResult;
class TGetDbsParams;
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/hybrid_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class HybridSetBase : public RuntimeFilterFuncBase {
_contains_null |= set->_contains_null;
}

bool empty() { return !_contains_null && size() == 0; }
virtual int size() = 0;
virtual bool find(const void* data) const = 0;
// use in vectorize execute engine
Expand Down
4 changes: 4 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MIL
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);

bvar::LatencyRecorder g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms");
bvar::LatencyRecorder g_stream_load_commit_and_publish_latency_ms("stream_load",
"commit_and_publish_ms");

static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
static const string CHUNK = "chunked";
Expand Down Expand Up @@ -185,6 +187,8 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
g_stream_load_commit_and_publish_latency_ms
<< ctx->commit_and_publish_txn_cost_nanos / 1000000;
}
return Status::OK();
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ LocalFileSystem::~LocalFileSystem() = default;

Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
VLOG_DEBUG << "create file: " << file.native()
<< ", sync_data: " << (opts ? opts->sync_file_data : true);
TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileSystem::create_file_impl",
Status::IOError("inject io error"));
int fd = ::open(file.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
Expand Down Expand Up @@ -108,6 +110,8 @@ Status LocalFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader,
}

Status LocalFileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) {
VLOG_DEBUG << "create directory: " << dir.native()
<< ", failed_if_exists: " << failed_if_exists;
bool exists = true;
RETURN_IF_ERROR(exists_impl(dir, &exists));
if (exists && failed_if_exists) {
Expand All @@ -124,6 +128,7 @@ Status LocalFileSystem::create_directory_impl(const Path& dir, bool failed_if_ex
}

Status LocalFileSystem::delete_file_impl(const Path& file) {
VLOG_DEBUG << "delete file: " << file.native();
bool exists = true;
RETURN_IF_ERROR(exists_impl(file, &exists));
if (!exists) {
Expand All @@ -141,6 +146,7 @@ Status LocalFileSystem::delete_file_impl(const Path& file) {
}

Status LocalFileSystem::delete_directory_impl(const Path& dir) {
VLOG_DEBUG << "delete directory: " << dir.native();
bool exists = true;
RETURN_IF_ERROR(exists_impl(dir, &exists));
if (!exists) {
Expand Down Expand Up @@ -249,6 +255,7 @@ Status LocalFileSystem::list_impl(const Path& dir, bool only_file, std::vector<F
}

Status LocalFileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
VLOG_DEBUG << "rename file: " << orig_name.native() << " to " << new_name.native();
TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileSystem::rename",
Status::IOError("inject io error"));
std::error_code ec;
Expand All @@ -265,6 +272,7 @@ Status LocalFileSystem::link_file(const Path& src, const Path& dest) {
}

Status LocalFileSystem::link_file_impl(const Path& src, const Path& dest) {
VLOG_DEBUG << "link file: " << src.native() << " to " << dest.native();
if (::link(src.c_str(), dest.c_str()) != 0) {
return localfs_error(errno, fmt::format("failed to create hard link from {} to {}",
src.native(), dest.native()));
Expand Down Expand Up @@ -364,6 +372,7 @@ Status LocalFileSystem::copy_path(const Path& src, const Path& dest) {
}

Status LocalFileSystem::copy_path_impl(const Path& src, const Path& dest) {
VLOG_DEBUG << "copy from " << src.native() << " to " << dest.native();
std::error_code ec;
std::filesystem::copy(src, dest, std::filesystem::copy_options::recursive, ec);
if (ec) {
Expand Down
22 changes: 19 additions & 3 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,25 @@ Status CompactionMixin::do_compact_ordered_rowsets() {

void CompactionMixin::build_basic_info() {
for (auto& rowset : _input_rowsets) {
_input_rowsets_data_size += rowset->data_disk_size();
_input_rowsets_index_size += rowset->index_disk_size();
_input_rowsets_total_size += rowset->total_disk_size();
const auto& rowset_meta = rowset->rowset_meta();
auto index_size = rowset_meta->index_disk_size();
auto total_size = rowset_meta->total_disk_size();
auto data_size = rowset_meta->data_disk_size();
// corrupted index size caused by bug before 2.1.5 or 3.0.0 version
// try to get real index size from disk.
if (index_size < 0 || index_size > total_size * 2) {
LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
<< " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
<< " rowset:" << rowset_meta->rowset_id();
index_size = 0;
auto st = rowset->get_inverted_index_size(&index_size);
if (!st.ok()) {
LOG(ERROR) << "failed to get inverted index size. res=" << st;
}
}
_input_rowsets_data_size += data_size;
_input_rowsets_index_size += index_size;
_input_rowsets_total_size += total_size;
_input_row_num += rowset->num_rows();
_input_num_segments += rowset->num_segments();
}
Expand Down
Loading

0 comments on commit dae5795

Please sign in to comment.