From e519a24c9acc1d135953abe9cb1a9066c29e50ec Mon Sep 17 00:00:00 2001 From: weizuo93 <68884553+weizuo93@users.noreply.github.com> Date: Mon, 26 Apr 2021 12:39:13 +0800 Subject: [PATCH] dynamic adjust compaction policy (#5651) Co-authored-by: weizuo --- be/src/common/config.h | 2 +- be/src/common/configbase.cpp | 9 +++ be/src/common/configbase.h | 4 ++ be/src/http/default_path_handlers.cpp | 1 + be/src/olap/compaction.cpp | 3 +- be/src/olap/cumulative_compaction_policy.cpp | 5 +- be/src/olap/cumulative_compaction_policy.h | 2 +- be/src/olap/olap_server.cpp | 62 +++++++++++-------- be/src/olap/storage_engine.h | 2 + be/src/olap/tablet.cpp | 16 ++++- be/src/olap/tablet.h | 10 +-- be/src/olap/tablet_manager.cpp | 5 +- be/src/olap/tablet_manager.h | 3 +- .../cumulative_compaction_policy_test.cpp | 18 +++++- 14 files changed, 96 insertions(+), 46 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 3c04c3a6aeac16..023c48edb0e573 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -247,7 +247,7 @@ CONF_mInt32(base_compaction_write_mbytes_per_sec, "5"); // num_based policy, the original version of cumulative compaction, cumulative version compaction once. // size_based policy, a optimization version of cumulative compaction, targeting the use cases requiring // lower write amplification, trading off read amplification and space amplification. -CONF_String(cumulative_compaction_policy, "size_based"); +CONF_mString(cumulative_compaction_policy, "size_based"); CONF_Validator(cumulative_compaction_policy, [](const std::string config) -> bool { return config == "size_based" || config == "num_based"; }); diff --git a/be/src/common/configbase.cpp b/be/src/common/configbase.cpp index 36645c5cd3a9cd..dc5a5b27a51627 100644 --- a/be/src/common/configbase.cpp +++ b/be/src/common/configbase.cpp @@ -41,6 +41,8 @@ std::map* full_conf_map = nullptr; std::mutex custom_conf_lock; +std::mutex mutable_string_config_lock; + // trim string std::string& trim(std::string& s) { // rtrim @@ -416,11 +418,18 @@ Status set_config(const std::string& field, const std::string& value, bool need_ UPDATE_FIELD(it->second, value, int32_t, need_persist); UPDATE_FIELD(it->second, value, int64_t, need_persist); UPDATE_FIELD(it->second, value, double, need_persist); + { + // add lock to ensure thread safe + std::lock_guard lock(mutable_string_config_lock); + UPDATE_FIELD(it->second, value, std::string, need_persist); + } // The other types are not thread safe to change dynamically. return Status::NotSupported(strings::Substitute( "'$0' is type of '$1' which is not support to modify", field, it->second.type)); } +std::mutex* get_mutable_string_config_lock() { return &mutable_string_config_lock; } + } // namespace config } // namespace doris diff --git a/be/src/common/configbase.h b/be/src/common/configbase.h index d24baa19b61329..e21be856470866 100644 --- a/be/src/common/configbase.h +++ b/be/src/common/configbase.h @@ -116,6 +116,7 @@ class RegisterConfValidator { #define CONF_mInt32(name, defaultstr) DEFINE_FIELD(int32_t, name, defaultstr, true) #define CONF_mInt64(name, defaultstr) DEFINE_FIELD(int64_t, name, defaultstr, true) #define CONF_mDouble(name, defaultstr) DEFINE_FIELD(double, name, defaultstr, true) +#define CONF_mString(name, defaultstr) DEFINE_FIELD(std::string, name, defaultstr, true) #define CONF_Validator(name, validator) DEFINE_VALIDATOR(name, validator) #else @@ -136,6 +137,7 @@ class RegisterConfValidator { #define CONF_mInt32(name, defaultstr) DECLARE_FIELD(int32_t, name) #define CONF_mInt64(name, defaultstr) DECLARE_FIELD(int64_t, name) #define CONF_mDouble(name, defaultstr) DECLARE_FIELD(double, name) +#define CONF_mString(name, defaultstr) DECLARE_FIELD(std::string, name) #define CONF_Validator(name, validator) DECLARE_VALIDATOR(name) #endif @@ -177,6 +179,8 @@ Status set_config(const std::string& field, const std::string& value, bool need_ bool persist_config(const std::string& field, const std::string& value); +std::mutex* get_mutable_string_config_lock(); + } // namespace config } // namespace doris diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 5ea00952ba65b4..b24267562e5052 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -74,6 +74,7 @@ void logs_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* ou void config_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) { (*output) << "

Configurations

"; (*output) << "
";
+    std::lock_guard lock(*config::get_mutable_string_config_lock());
     for (const auto& it : *(config::full_conf_map)) {
         (*output) << it.first << "=" << it.second << std::endl;
     }
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 72f23cdc8b669a..6a47c89da9a969 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -136,7 +136,8 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
               << ", output_version=" << _output_version.first << "-" << _output_version.second
               << ", current_max_version=" << _tablet->rowset_with_max_version()->end_version()
               << ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
-              << ". elapsed time=" << watch.get_elapse_second() << "s.";
+              << ". elapsed time=" << watch.get_elapse_second() << "s. cumulative_compaction_policy="
+              << _tablet->cumulative_compaction_policy()->name() << ".";
 
     return OLAP_SUCCESS;
 }
diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp
index e6e9392121d977..529a132fc45518 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -438,7 +438,7 @@ void CumulativeCompactionPolicy::pick_candidate_rowsets(
     std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator);
 }
 
-std::unique_ptr
+std::shared_ptr
 CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) {
     CompactionPolicy policy_type;
     _parse_cumulative_compaction_policy(type, &policy_type);
@@ -451,12 +451,11 @@ CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::stri
                 new SizeBasedCumulativeCompactionPolicy());
     }
 
-    return std::unique_ptr(new NumBasedCumulativeCompactionPolicy());
+    return std::shared_ptr(new NumBasedCumulativeCompactionPolicy());
 }
 
 void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(
         std::string type, CompactionPolicy* policy_type) {
-    boost::to_upper(type);
     if (type == CUMULATIVE_NUM_BASED_POLICY) {
         *policy_type = NUM_BASED_POLICY;
     } else if (type == CUMULATIVE_SIZE_BASED_POLICY) {
diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h
index 35d0ce37ceb3a3..038b136a6a05e3 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -248,7 +248,7 @@ class CumulativeCompactionPolicyFactory {
 public:
     /// Static factory function. It can product different policy according to the `policy` parameter and use tablet ptr
     /// to construct the policy. Now it can product size based and num based policies.
-    static std::unique_ptr create_cumulative_compaction_policy(
+    static std::shared_ptr create_cumulative_compaction_policy(
             std::string policy);
 
 private:
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 488ac63fc04716..871e096087999c 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -64,9 +64,6 @@ Status StorageEngine::start_bg_threads() {
         data_dirs.push_back(tmp_store.second);
     }
 
-    // check cumulative compaction config
-    _check_cumulative_compaction_config();
-
     int32_t max_thread_num = config::max_compaction_threads;
     int32_t min_thread_num = config::min_compaction_threads;
     ThreadPoolBuilder("CompactionTaskThreadPool")
@@ -208,29 +205,23 @@ void StorageEngine::_disk_stat_monitor_thread_callback() {
 }
 
 void StorageEngine::_check_cumulative_compaction_config() {
-    std::string cumulative_compaction_type = config::cumulative_compaction_policy;
-    boost::to_upper(cumulative_compaction_type);
-
-    // if size_based policy is used, check size_based policy configs
-    if (cumulative_compaction_type == CUMULATIVE_SIZE_BASED_POLICY) {
-        int64_t size_based_promotion_size = config::cumulative_size_based_promotion_size_mbytes;
-        int64_t size_based_promotion_min_size =
-                config::cumulative_size_based_promotion_min_size_mbytes;
-        int64_t size_based_compaction_lower_bound_size =
-                config::cumulative_size_based_compaction_lower_size_mbytes;
-
-        // check size_based_promotion_size must be greater than size_based_promotion_min_size and 2 * size_based_compaction_lower_bound_size
-        int64_t should_min_size_based_promotion_size =
-                std::max(size_based_promotion_min_size, 2 * size_based_compaction_lower_bound_size);
-
-        if (size_based_promotion_size < should_min_size_based_promotion_size) {
-            size_based_promotion_size = should_min_size_based_promotion_size;
-            LOG(WARNING) << "the config size_based_promotion_size is adjusted to "
-                            "size_based_promotion_min_size or  2 * "
-                            "size_based_compaction_lower_bound_size "
-                         << should_min_size_based_promotion_size
-                         << ", because size_based_promotion_size is small";
-        }
+    int64_t size_based_promotion_size = config::cumulative_size_based_promotion_size_mbytes;
+    int64_t size_based_promotion_min_size =
+            config::cumulative_size_based_promotion_min_size_mbytes;
+    int64_t size_based_compaction_lower_bound_size =
+            config::cumulative_size_based_compaction_lower_size_mbytes;
+
+    // check size_based_promotion_size must be greater than size_based_promotion_min_size and 2 * size_based_compaction_lower_bound_size
+    int64_t should_min_size_based_promotion_size =
+            std::max(size_based_promotion_min_size, 2 * size_based_compaction_lower_bound_size);
+
+    if (size_based_promotion_size < should_min_size_based_promotion_size) {
+        size_based_promotion_size = should_min_size_based_promotion_size;
+        LOG(WARNING) << "the config size_based_promotion_size is adjusted to "
+                        "size_based_promotion_min_size or  2 * "
+                        "size_based_compaction_lower_bound_size "
+                     << should_min_size_based_promotion_size
+                     << ", because size_based_promotion_size is small";
     }
 }
 
@@ -409,6 +400,22 @@ void StorageEngine::_compaction_tasks_producer_callback() {
 
 std::vector StorageEngine::_compaction_tasks_generator(
         CompactionType compaction_type, std::vector& data_dirs, bool check_score) {
+    std::string current_policy = "";
+    {
+        std::lock_guard lock(*config::get_mutable_string_config_lock());
+        current_policy = config::cumulative_compaction_policy;
+    }
+    boost::to_upper(current_policy);
+    if (_cumulative_compaction_policy == nullptr ||
+        _cumulative_compaction_policy->name() != current_policy) {
+        if (current_policy == CUMULATIVE_SIZE_BASED_POLICY) {
+            // check size_based cumulative compaction config
+            _check_cumulative_compaction_config();
+        }
+        _cumulative_compaction_policy =
+                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(current_policy);
+    }
+
     std::vector tablets_compaction;
     uint32_t max_compaction_score = 0;
     bool need_pick_tablet = true;
@@ -448,7 +455,8 @@ std::vector StorageEngine::_compaction_tasks_generator(
         if (!data_dir->reach_capacity_limit(0)) {
             uint32_t disk_max_score = 0;
             TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
-                    compaction_type, data_dir, _tablet_submitted_compaction[data_dir], &disk_max_score);
+                    compaction_type, data_dir, _tablet_submitted_compaction[data_dir], &disk_max_score,
+                    _cumulative_compaction_policy);
             if (tablet != nullptr) {
                 tablets_compaction.emplace_back(tablet);
                 max_compaction_score = std::max(max_compaction_score, disk_max_score);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index ef7662421c148d..3ba035bb4bf8d9 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -357,6 +357,8 @@ class StorageEngine {
 
     std::shared_ptr _stream_load_recorder;
 
+    std::shared_ptr _cumulative_compaction_policy;
+
     DISALLOW_COPY_AND_ASSIGN(StorageEngine);
 };
 
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 0c0671070440c6..8d99c8cff7acc1 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -87,10 +87,12 @@ OLAPStatus Tablet::_init_once_action() {
     VLOG_NOTICE << "begin to load tablet. tablet=" << full_name()
             << ", version_size=" << _tablet_meta->version_count();
 
+#ifdef BE_TEST
     // init cumulative compaction policy by type
     _cumulative_compaction_policy =
             CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
                     _cumulative_compaction_type);
+#endif
 
     for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
         Version version = rs_meta->version();
@@ -720,18 +722,26 @@ bool Tablet::can_do_compaction() {
     return true;
 }
 
-const uint32_t Tablet::calc_compaction_score(CompactionType compaction_type) const {
+uint32_t Tablet::calc_compaction_score(CompactionType compaction_type,
+            std::shared_ptr cumulative_compaction_policy) {
     // Need meta lock, because it will iterator "all_rs_metas" of tablet meta.
     ReadLock rdlock(&_meta_lock);
     if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
-        return _calc_cumulative_compaction_score();
+        return _calc_cumulative_compaction_score(cumulative_compaction_policy);
     } else {
         DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION);
         return _calc_base_compaction_score();
     }
 }
 
-const uint32_t Tablet::_calc_cumulative_compaction_score() const {
+const uint32_t Tablet::_calc_cumulative_compaction_score(
+        std::shared_ptr cumulative_compaction_policy) {
+#ifndef BE_TEST
+    if (_cumulative_compaction_policy == nullptr || _cumulative_compaction_policy->name() !=
+                                                        cumulative_compaction_policy->name()) {
+        _cumulative_compaction_policy = cumulative_compaction_policy;
+    }
+#endif
     uint32_t score = 0;
     _cumulative_compaction_policy->calc_cumulative_compaction_score(
             _tablet_meta->all_rs_metas(), cumulative_layer_point(), &score);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 15e68588c1fb4e..8850917253a65c 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -56,7 +56,7 @@ class Tablet : public BaseTablet {
                                                    DataDir* data_dir = nullptr);
 
     Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
-           const std::string& cumulative_compaction_type = config::cumulative_compaction_policy);
+           const std::string& cumulative_compaction_type = "");
 
     OLAPStatus init();
     inline bool init_succeeded();
@@ -162,7 +162,8 @@ class Tablet : public BaseTablet {
 
     // operation for compaction
     bool can_do_compaction();
-    const uint32_t calc_compaction_score(CompactionType compaction_type) const;
+    uint32_t calc_compaction_score(CompactionType compaction_type,
+               std::shared_ptr cumulative_compaction_policy);
     static void compute_version_hash_from_rowsets(const std::vector& rowsets,
                                                   VersionHash* version_hash);
 
@@ -263,7 +264,8 @@ class Tablet : public BaseTablet {
     OLAPStatus _capture_consistent_rowsets_unlocked(const vector& version_path,
                                                     vector* rowsets) const;
 
-    const uint32_t _calc_cumulative_compaction_score() const;
+    const uint32_t _calc_cumulative_compaction_score(
+            std::shared_ptr cumulative_compaction_policy);
     const uint32_t _calc_base_compaction_score() const;
 
 public:
@@ -307,7 +309,7 @@ class Tablet : public BaseTablet {
     std::atomic _last_checkpoint_time;
 
     // cumulative compaction policy
-    std::unique_ptr _cumulative_compaction_policy;
+    std::shared_ptr _cumulative_compaction_policy;
     std::string _cumulative_compaction_type;
 
     // the value of metric 'query_scan_count' and timestamp will be recorded when every time
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index c35c5c88cd17aa..ee088e6d4094ec 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -698,7 +698,8 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {
 
 TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
         CompactionType compaction_type, DataDir* data_dir,
-        const std::map& tablet_submitted_compaction, uint32_t* score) {
+        const std::map& tablet_submitted_compaction, uint32_t* score,
+        std::shared_ptr cumulative_compaction_policy) {
     int64_t now_ms = UnixMillis();
     const string& compaction_type_str =
             compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative";
@@ -764,7 +765,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
                 }
 
                 uint32_t current_compaction_score =
-                        tablet_ptr->calc_compaction_score(compaction_type);
+                        tablet_ptr->calc_compaction_score(compaction_type, cumulative_compaction_policy);
 
                 double scan_frequency = 0.0;
                 if (config::compaction_tablet_scan_frequency_factor != 0) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index c021d757a474e1..f043b7cdf9ee29 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -72,7 +72,8 @@ class TabletManager {
     TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type,
                                                    DataDir* data_dir,
                                                    const std::map& tablet_submitted_compaction,
-                                                   uint32_t* score);
+                                                   uint32_t* score,
+                                                   std::shared_ptr cumulative_compaction_policy);
 
     TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
                                bool include_deleted = false, std::string* err = nullptr);
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp
index a63c4ceca3ef9a..bc61071bec1f03 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -210,8 +210,12 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score)
 
     TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_NUM_BASED_POLICY));
     _tablet->init();
+    std::shared_ptr cumulative_compaction_policy =
+                        CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_NUM_BASED_POLICY);
 
-    const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION);
+    const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
+                                                          cumulative_compaction_policy);
 
     ASSERT_EQ(15, score);
 }
@@ -665,7 +669,11 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION);
+    std::shared_ptr cumulative_compaction_policy =
+            CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_SIZE_BASED_POLICY);
+    const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
+                                                          cumulative_compaction_policy);
 
     ASSERT_EQ(15, score);
 }
@@ -681,7 +689,11 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score
     TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
     _tablet->init();
     _tablet->calculate_cumulative_point();
-    const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION);
+    std::shared_ptr cumulative_compaction_policy =
+            CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_SIZE_BASED_POLICY);
+    const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
+                                                          cumulative_compaction_policy);
 
     ASSERT_EQ(7, score);
 }