Skip to content

Commit

Permalink
dynamic adjust compaction policy (apache#5651)
Browse files Browse the repository at this point in the history
Co-authored-by: weizuo <[email protected]>
  • Loading branch information
weizuo93 and weizuo authored Apr 26, 2021
1 parent 1783dbf commit e519a24
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 46 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
// num_based policy, the original version of cumulative compaction, cumulative version compaction once.
// size_based policy, a optimization version of cumulative compaction, targeting the use cases requiring
// lower write amplification, trading off read amplification and space amplification.
CONF_String(cumulative_compaction_policy, "size_based");
CONF_mString(cumulative_compaction_policy, "size_based");
CONF_Validator(cumulative_compaction_policy, [](const std::string config) -> bool {
return config == "size_based" || config == "num_based";
});
Expand Down
9 changes: 9 additions & 0 deletions be/src/common/configbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ std::map<std::string, std::string>* full_conf_map = nullptr;

std::mutex custom_conf_lock;

std::mutex mutable_string_config_lock;

// trim string
std::string& trim(std::string& s) {
// rtrim
Expand Down Expand Up @@ -416,11 +418,18 @@ Status set_config(const std::string& field, const std::string& value, bool need_
UPDATE_FIELD(it->second, value, int32_t, need_persist);
UPDATE_FIELD(it->second, value, int64_t, need_persist);
UPDATE_FIELD(it->second, value, double, need_persist);
{
// add lock to ensure thread safe
std::lock_guard<std::mutex> lock(mutable_string_config_lock);
UPDATE_FIELD(it->second, value, std::string, need_persist);
}

// The other types are not thread safe to change dynamically.
return Status::NotSupported(strings::Substitute(
"'$0' is type of '$1' which is not support to modify", field, it->second.type));
}

std::mutex* get_mutable_string_config_lock() { return &mutable_string_config_lock; }

} // namespace config
} // namespace doris
4 changes: 4 additions & 0 deletions be/src/common/configbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class RegisterConfValidator {
#define CONF_mInt32(name, defaultstr) DEFINE_FIELD(int32_t, name, defaultstr, true)
#define CONF_mInt64(name, defaultstr) DEFINE_FIELD(int64_t, name, defaultstr, true)
#define CONF_mDouble(name, defaultstr) DEFINE_FIELD(double, name, defaultstr, true)
#define CONF_mString(name, defaultstr) DEFINE_FIELD(std::string, name, defaultstr, true)
#define CONF_Validator(name, validator) DEFINE_VALIDATOR(name, validator)

#else
Expand All @@ -136,6 +137,7 @@ class RegisterConfValidator {
#define CONF_mInt32(name, defaultstr) DECLARE_FIELD(int32_t, name)
#define CONF_mInt64(name, defaultstr) DECLARE_FIELD(int64_t, name)
#define CONF_mDouble(name, defaultstr) DECLARE_FIELD(double, name)
#define CONF_mString(name, defaultstr) DECLARE_FIELD(std::string, name)
#define CONF_Validator(name, validator) DECLARE_VALIDATOR(name)
#endif

Expand Down Expand Up @@ -177,6 +179,8 @@ Status set_config(const std::string& field, const std::string& value, bool need_

bool persist_config(const std::string& field, const std::string& value);

std::mutex* get_mutable_string_config_lock();

} // namespace config
} // namespace doris

Expand Down
1 change: 1 addition & 0 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ void logs_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* ou
void config_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) {
(*output) << "<h2>Configurations</h2>";
(*output) << "<pre>";
std::lock_guard<std::mutex> lock(*config::get_mutable_string_config_lock());
for (const auto& it : *(config::full_conf_map)) {
(*output) << it.first << "=" << it.second << std::endl;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
<< ", output_version=" << _output_version.first << "-" << _output_version.second
<< ", current_max_version=" << _tablet->rowset_with_max_version()->end_version()
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
<< ". elapsed time=" << watch.get_elapse_second() << "s.";
<< ". elapsed time=" << watch.get_elapse_second() << "s. cumulative_compaction_policy="
<< _tablet->cumulative_compaction_policy()->name() << ".";

return OLAP_SUCCESS;
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ void CumulativeCompactionPolicy::pick_candidate_rowsets(
std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator);
}

std::unique_ptr<CumulativeCompactionPolicy>
std::shared_ptr<CumulativeCompactionPolicy>
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) {
CompactionPolicy policy_type;
_parse_cumulative_compaction_policy(type, &policy_type);
Expand All @@ -451,12 +451,11 @@ CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::stri
new SizeBasedCumulativeCompactionPolicy());
}

return std::unique_ptr<CumulativeCompactionPolicy>(new NumBasedCumulativeCompactionPolicy());
return std::shared_ptr<CumulativeCompactionPolicy>(new NumBasedCumulativeCompactionPolicy());
}

void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(
std::string type, CompactionPolicy* policy_type) {
boost::to_upper(type);
if (type == CUMULATIVE_NUM_BASED_POLICY) {
*policy_type = NUM_BASED_POLICY;
} else if (type == CUMULATIVE_SIZE_BASED_POLICY) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class CumulativeCompactionPolicyFactory {
public:
/// Static factory function. It can product different policy according to the `policy` parameter and use tablet ptr
/// to construct the policy. Now it can product size based and num based policies.
static std::unique_ptr<CumulativeCompactionPolicy> create_cumulative_compaction_policy(
static std::shared_ptr<CumulativeCompactionPolicy> create_cumulative_compaction_policy(
std::string policy);

private:
Expand Down
62 changes: 35 additions & 27 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ Status StorageEngine::start_bg_threads() {
data_dirs.push_back(tmp_store.second);
}

// check cumulative compaction config
_check_cumulative_compaction_config();

int32_t max_thread_num = config::max_compaction_threads;
int32_t min_thread_num = config::min_compaction_threads;
ThreadPoolBuilder("CompactionTaskThreadPool")
Expand Down Expand Up @@ -208,29 +205,23 @@ void StorageEngine::_disk_stat_monitor_thread_callback() {
}

void StorageEngine::_check_cumulative_compaction_config() {
std::string cumulative_compaction_type = config::cumulative_compaction_policy;
boost::to_upper(cumulative_compaction_type);

// if size_based policy is used, check size_based policy configs
if (cumulative_compaction_type == CUMULATIVE_SIZE_BASED_POLICY) {
int64_t size_based_promotion_size = config::cumulative_size_based_promotion_size_mbytes;
int64_t size_based_promotion_min_size =
config::cumulative_size_based_promotion_min_size_mbytes;
int64_t size_based_compaction_lower_bound_size =
config::cumulative_size_based_compaction_lower_size_mbytes;

// check size_based_promotion_size must be greater than size_based_promotion_min_size and 2 * size_based_compaction_lower_bound_size
int64_t should_min_size_based_promotion_size =
std::max(size_based_promotion_min_size, 2 * size_based_compaction_lower_bound_size);

if (size_based_promotion_size < should_min_size_based_promotion_size) {
size_based_promotion_size = should_min_size_based_promotion_size;
LOG(WARNING) << "the config size_based_promotion_size is adjusted to "
"size_based_promotion_min_size or 2 * "
"size_based_compaction_lower_bound_size "
<< should_min_size_based_promotion_size
<< ", because size_based_promotion_size is small";
}
int64_t size_based_promotion_size = config::cumulative_size_based_promotion_size_mbytes;
int64_t size_based_promotion_min_size =
config::cumulative_size_based_promotion_min_size_mbytes;
int64_t size_based_compaction_lower_bound_size =
config::cumulative_size_based_compaction_lower_size_mbytes;

// check size_based_promotion_size must be greater than size_based_promotion_min_size and 2 * size_based_compaction_lower_bound_size
int64_t should_min_size_based_promotion_size =
std::max(size_based_promotion_min_size, 2 * size_based_compaction_lower_bound_size);

if (size_based_promotion_size < should_min_size_based_promotion_size) {
size_based_promotion_size = should_min_size_based_promotion_size;
LOG(WARNING) << "the config size_based_promotion_size is adjusted to "
"size_based_promotion_min_size or 2 * "
"size_based_compaction_lower_bound_size "
<< should_min_size_based_promotion_size
<< ", because size_based_promotion_size is small";
}
}

Expand Down Expand Up @@ -409,6 +400,22 @@ void StorageEngine::_compaction_tasks_producer_callback() {

std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
std::string current_policy = "";
{
std::lock_guard<std::mutex> lock(*config::get_mutable_string_config_lock());
current_policy = config::cumulative_compaction_policy;
}
boost::to_upper(current_policy);
if (_cumulative_compaction_policy == nullptr ||
_cumulative_compaction_policy->name() != current_policy) {
if (current_policy == CUMULATIVE_SIZE_BASED_POLICY) {
// check size_based cumulative compaction config
_check_cumulative_compaction_config();
}
_cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(current_policy);
}

std::vector<TabletSharedPtr> tablets_compaction;
uint32_t max_compaction_score = 0;
bool need_pick_tablet = true;
Expand Down Expand Up @@ -448,7 +455,8 @@ std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
if (!data_dir->reach_capacity_limit(0)) {
uint32_t disk_max_score = 0;
TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
compaction_type, data_dir, _tablet_submitted_compaction[data_dir], &disk_max_score);
compaction_type, data_dir, _tablet_submitted_compaction[data_dir], &disk_max_score,
_cumulative_compaction_policy);
if (tablet != nullptr) {
tablets_compaction.emplace_back(tablet);
max_compaction_score = std::max(max_compaction_score, disk_max_score);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ class StorageEngine {

std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;

std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;

DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};

Expand Down
16 changes: 13 additions & 3 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ OLAPStatus Tablet::_init_once_action() {
VLOG_NOTICE << "begin to load tablet. tablet=" << full_name()
<< ", version_size=" << _tablet_meta->version_count();

#ifdef BE_TEST
// init cumulative compaction policy by type
_cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
_cumulative_compaction_type);
#endif

for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
Version version = rs_meta->version();
Expand Down Expand Up @@ -720,18 +722,26 @@ bool Tablet::can_do_compaction() {
return true;
}

const uint32_t Tablet::calc_compaction_score(CompactionType compaction_type) const {
uint32_t Tablet::calc_compaction_score(CompactionType compaction_type,
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
// Need meta lock, because it will iterator "all_rs_metas" of tablet meta.
ReadLock rdlock(&_meta_lock);
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
return _calc_cumulative_compaction_score();
return _calc_cumulative_compaction_score(cumulative_compaction_policy);
} else {
DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION);
return _calc_base_compaction_score();
}
}

const uint32_t Tablet::_calc_cumulative_compaction_score() const {
const uint32_t Tablet::_calc_cumulative_compaction_score(
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
#ifndef BE_TEST
if (_cumulative_compaction_policy == nullptr || _cumulative_compaction_policy->name() !=
cumulative_compaction_policy->name()) {
_cumulative_compaction_policy = cumulative_compaction_policy;
}
#endif
uint32_t score = 0;
_cumulative_compaction_policy->calc_cumulative_compaction_score(
_tablet_meta->all_rs_metas(), cumulative_layer_point(), &score);
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Tablet : public BaseTablet {
DataDir* data_dir = nullptr);

Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
const std::string& cumulative_compaction_type = config::cumulative_compaction_policy);
const std::string& cumulative_compaction_type = "");

OLAPStatus init();
inline bool init_succeeded();
Expand Down Expand Up @@ -162,7 +162,8 @@ class Tablet : public BaseTablet {

// operation for compaction
bool can_do_compaction();
const uint32_t calc_compaction_score(CompactionType compaction_type) const;
uint32_t calc_compaction_score(CompactionType compaction_type,
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);
static void compute_version_hash_from_rowsets(const std::vector<RowsetSharedPtr>& rowsets,
VersionHash* version_hash);

Expand Down Expand Up @@ -263,7 +264,8 @@ class Tablet : public BaseTablet {
OLAPStatus _capture_consistent_rowsets_unlocked(const vector<Version>& version_path,
vector<RowsetSharedPtr>* rowsets) const;

const uint32_t _calc_cumulative_compaction_score() const;
const uint32_t _calc_cumulative_compaction_score(
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);
const uint32_t _calc_base_compaction_score() const;

public:
Expand Down Expand Up @@ -307,7 +309,7 @@ class Tablet : public BaseTablet {
std::atomic<int64_t> _last_checkpoint_time;

// cumulative compaction policy
std::unique_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
std::string _cumulative_compaction_type;

// the value of metric 'query_scan_count' and timestamp will be recorded when every time
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,8 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {

TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::map<TTabletId, CompactionType>& tablet_submitted_compaction, uint32_t* score) {
const std::map<TTabletId, CompactionType>& tablet_submitted_compaction, uint32_t* score,
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
int64_t now_ms = UnixMillis();
const string& compaction_type_str =
compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative";
Expand Down Expand Up @@ -764,7 +765,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
}

uint32_t current_compaction_score =
tablet_ptr->calc_compaction_score(compaction_type);
tablet_ptr->calc_compaction_score(compaction_type, cumulative_compaction_policy);

double scan_frequency = 0.0;
if (config::compaction_tablet_scan_frequency_factor != 0) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class TabletManager {
TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type,
DataDir* data_dir,
const std::map<TTabletId, CompactionType>& tablet_submitted_compaction,
uint32_t* score);
uint32_t* score,
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);

TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
bool include_deleted = false, std::string* err = nullptr);
Expand Down
18 changes: 15 additions & 3 deletions be/test/olap/cumulative_compaction_policy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,12 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score)

TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_NUM_BASED_POLICY));
_tablet->init();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_NUM_BASED_POLICY);

const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION);
const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policy);

ASSERT_EQ(15, score);
}
Expand Down Expand Up @@ -665,7 +669,11 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score
_tablet->init();
_tablet->calculate_cumulative_point();

const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION);
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_SIZE_BASED_POLICY);
const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policy);

ASSERT_EQ(15, score);
}
Expand All @@ -681,7 +689,11 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION);
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_SIZE_BASED_POLICY);
const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policy);

ASSERT_EQ(7, score);
}
Expand Down

0 comments on commit e519a24

Please sign in to comment.