Skip to content

Commit

Permalink
[Enhance] Support to configure RocksDB to improve tablet meta store
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Apr 23, 2021
1 parent 12b2447 commit e864eb8
Show file tree
Hide file tree
Showing 19 changed files with 130 additions and 160 deletions.
20 changes: 15 additions & 5 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ CONF_mInt32(max_percentage_of_error_disk, "0");
CONF_mInt32(default_num_rows_per_column_file_block, "1024");
// pending data policy
CONF_mInt32(pending_data_expire_time_sec, "1800");
// inc_rowset expired interval
CONF_mInt32(inc_rowset_expired_sec, "1800");
// inc_rowset snapshot rs sweep time interval
CONF_mInt32(tablet_rowset_stale_sweep_time_sec, "1800");
// garbage sweep policy
Expand Down Expand Up @@ -248,8 +246,8 @@ CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
// size_based policy, a optimization version of cumulative compaction, targeting the use cases requiring
// lower write amplification, trading off read amplification and space amplification.
CONF_String(cumulative_compaction_policy, "size_based");
CONF_Validator(cumulative_compaction_policy, [](const std::string config) -> bool {
return config == "size_based" || config == "num_based";
CONF_Validator(cumulative_compaction_policy, [](const std::string& value) -> bool {
return value == "size_based" || value == "num_based";
});

// In size_based policy, output rowset of cumulative compaction total disk size exceed this config size,
Expand Down Expand Up @@ -530,10 +528,23 @@ CONF_Int32(flush_thread_num_per_store, "2");
CONF_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
CONF_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
CONF_Int32(generate_tablet_meta_checkpoint_tasks_interval_secs, "600");
// Thread count of RocksDB uses for background flush and compaction, -1 means the number of cores.
CONF_Int32(rocksdb_thread_count, "-1");
CONF_Int32(rocksdb_block_cache_mb, "64");
CONF_Validator(rocksdb_block_cache_mb, [](int value) -> bool {
return value > 0;
});
CONF_String(rocksdb_compression_type, "LZ4");
CONF_Validator(rocksdb_compression_type, [](const std::string& value) -> bool {
return value == "LZ4" || value == "SNAPPY";
});

// config for default rowset type
// Valid configs: ALPHA, BETA
CONF_String(default_rowset_type, "BETA");
CONF_Validator(default_rowset_type, [](const std::string& value) -> bool {
return value == "ALPHA" || value == "BETA";
});

// Maximum size of a single message body in all protocols
CONF_Int64(brpc_max_body_size, "209715200");
Expand Down Expand Up @@ -603,4 +614,3 @@ CONF_Int32(aws_log_level, "3");
} // namespace doris

#endif // DORIS_BE_SRC_COMMON_CONFIG_H

52 changes: 2 additions & 50 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,54 +412,7 @@ std::string DataDir::get_root_path_from_schema_hash_path_in_trash(
.string();
}

OLAPStatus DataDir::_clean_unfinished_converting_data() {
auto clean_unifinished_tablet_meta_func = [this](int64_t tablet_id, int32_t schema_hash,
const std::string& value) -> bool {
TabletMetaManager::remove(this, tablet_id, schema_hash, HEADER_PREFIX);
LOG(INFO) << "successfully clean temp tablet meta for tablet=" << tablet_id << "."
<< schema_hash << "from data dir: " << _path;
return true;
};
OLAPStatus clean_unfinished_meta_status = TabletMetaManager::traverse_headers(
_meta, clean_unifinished_tablet_meta_func, HEADER_PREFIX);
if (clean_unfinished_meta_status != OLAP_SUCCESS) {
// If failed to clean meta just skip the error, there will be useless metas in rocksdb column family
LOG(WARNING) << "there is failure when clean temp tablet meta from data dir=" << _path;
} else {
LOG(INFO) << "successfully clean temp tablet meta from data dir=" << _path;
}
auto clean_unifinished_rowset_meta_func = [this](TabletUid tablet_uid, RowsetId rowset_id,
const std::string& value) -> bool {
RowsetMetaManager::remove(_meta, tablet_uid, rowset_id);
LOG(INFO) << "successfully clean temp rowset meta for rowset_id=" << rowset_id
<< " from data dir=" << _path;
return true;
};
OLAPStatus clean_unfinished_rowset_meta_status =
RowsetMetaManager::traverse_rowset_metas(_meta, clean_unifinished_rowset_meta_func);
if (clean_unfinished_rowset_meta_status != OLAP_SUCCESS) {
// If failed to clean meta just skip the error, there will be useless metas in rocksdb column family
LOG(FATAL) << "fail to clean temp rowset meta from data dir=" << _path;
} else {
LOG(INFO) << "success to clean temp rowset meta from data dir=" << _path;
}
return OLAP_SUCCESS;
}

bool DataDir::convert_old_data_success() {
return _convert_old_data_success;
}

OLAPStatus DataDir::set_convert_finished() {
OLAPStatus res = _meta->set_tablet_convert_finished();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "save convert flag failed after convert old tablet. dir=" << _path;
return res;
}
return OLAP_SUCCESS;
}

OLAPStatus DataDir::_check_incompatible_old_format_tablet() {
void DataDir::_check_incompatible_old_format_tablet() {
auto check_incompatible_old_func = [this](int64_t tablet_id, int32_t schema_hash,
const std::string& value) -> bool {
// if strict check incompatible old format, then log fatal
Expand All @@ -486,7 +439,6 @@ OLAPStatus DataDir::_check_incompatible_old_format_tablet() {
} else {
LOG(INFO) << "successfully check incompatible old format meta " << _path;
}
return check_incompatible_old_status;
}

// TODO(ygl): deal with rowsets and tablets when load failed
Expand Down Expand Up @@ -557,7 +509,7 @@ OLAPStatus DataDir::load() {
}
return true;
};
OLAPStatus load_tablet_status = TabletMetaManager::traverse_headers(_meta, load_tablet_func);
OLAPStatus load_tablet_status = TabletMetaManager::traverse_headers(_meta, load_tablet_func, HEADER_PREFIX);
if (failed_tablet_ids.size() != 0) {
LOG(WARNING) << "load tablets from header failed"
<< ", loaded tablet: " << tablet_ids.size()
Expand Down
10 changes: 1 addition & 9 deletions be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ class DataDir {

void perform_path_gc_by_tablet();

bool convert_old_data_success();

OLAPStatus set_convert_finished();

// check if the capacity reach the limit after adding the incoming data
// return true if limit reached, otherwise, return false.
// TODO(cmy): for now we can not precisely calculate the capacity Doris used,
Expand Down Expand Up @@ -141,11 +137,10 @@ class DataDir {
OLAPStatus _read_and_write_test_file();
Status _read_cluster_id(const std::string& cluster_id_path, int32_t* cluster_id);
Status _write_cluster_id_to_path(const std::string& path, int32_t cluster_id);
OLAPStatus _clean_unfinished_converting_data();
// Check whether has old format (hdr_ start) in olap. When doris updating to current version,
// it may lead to data missing. When conf::storage_strict_check_incompatible_old_format is true,
// process will log fatal.
OLAPStatus _check_incompatible_old_format_tablet();
void _check_incompatible_old_format_tablet();

void _process_garbage_path(const std::string& path);

Expand Down Expand Up @@ -194,9 +189,6 @@ class DataDir {
RWMutex _pending_path_mutex;
std::set<std::string> _pending_path_ids;

// used in convert process
bool _convert_old_data_success;

std::shared_ptr<MetricEntity> _data_dir_metric_entity;
IntGauge* disks_total_capacity;
IntGauge* disks_avail_capacity;
Expand Down
24 changes: 11 additions & 13 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ static const std::string ERROR_LOG_PREFIX = "/error_log";
static const std::string PENDING_DELTA_PREFIX = "/pending_delta";
static const std::string INCREMENTAL_DELTA_PREFIX = "/incremental_delta";
static const std::string CLONE_PREFIX = "/clone";
static const std::string META_POSTFIX = "/meta";

static const int32_t OLAP_DATA_VERSION_APPLIED = DORIS_V1;

Expand Down Expand Up @@ -286,7 +287,7 @@ enum OLAPStatus {
OLAP_ERR_HEADER_DELETE_VERSION = -1401,
OLAP_ERR_HEADER_ADD_PENDING_DELTA = -1402,
OLAP_ERR_HEADER_ADD_INCREMENTAL_VERSION = -1403,
OLAP_ERR_HEADER_INVALID_FLAG = -1404,
OLAP_ERR_HEADER_INVALID_FLAG = -1404, // deprecated
OLAP_ERR_HEADER_PUT = -1405,
OLAP_ERR_HEADER_DELETE = -1406,
OLAP_ERR_HEADER_GET = -1407,
Expand Down Expand Up @@ -375,24 +376,21 @@ enum OLAPStatus {

enum ColumnFamilyIndex {
DEFAULT_COLUMN_FAMILY_INDEX = 0,
DORIS_COLUMN_FAMILY_INDEX,
DORIS_COLUMN_FAMILY_INDEX, // deprecated
META_COLUMN_FAMILY_INDEX,
};

static const char* const HINIS_KEY_SEPARATOR = ";";
static const char* const HINIS_KEY_PAIR_SEPARATOR = "|";
static const char* const HINIS_KEY_GROUP_SEPARATOR = "&";
const std::string OLD_HEADER_PREFIX = "hdr_"; // deprecated, but it will be used to check old data
const std::string HEADER_PREFIX = "tabletmeta_";
const std::string ROWSET_PREFIX = "rst_";

static const std::string DEFAULT_COLUMN_FAMILY = "default";
static const std::string DORIS_COLUMN_FAMILY = "doris";
static const std::string META_COLUMN_FAMILY = "meta";
static const std::string END_ROWSET_ID = "end_rowset_id";
static const std::string CONVERTED_FLAG = "true";
static const std::string TABLET_CONVERT_FINISHED = "tablet_convert_finished";
const std::string TABLET_ID_KEY = "tablet_id";
const std::string TABLET_SCHEMA_HASH_KEY = "schema_hash";
const std::string TABLET_ID_PREFIX = "t_";
const std::string ROWSET_ID_PREFIX = "s_";

static const std::string TABLET_ID_KEY = "tablet_id";
static const std::string TABLET_SCHEMA_HASH_KEY = "schema_hash";
static const std::string TABLET_ID_PREFIX = "t_";
static const std::string ROWSET_ID_PREFIX = "s_";

#if defined(__GNUC__)
#define OLAP_LIKELY(x) __builtin_expect((x), 1)
Expand Down
72 changes: 39 additions & 33 deletions be/src/olap/olap_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
#include "common/logging.h"
#include "olap/olap_define.h"
#include "rocksdb/db.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "util/cpu_info.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"

Expand All @@ -38,12 +40,22 @@ using rocksdb::ReadOptions;
using rocksdb::WriteOptions;
using rocksdb::Slice;
using rocksdb::Iterator;
using rocksdb::kDefaultColumnFamilyName;
using rocksdb::NewFixedPrefixTransform;

namespace doris {
const std::string META_POSTFIX = "/meta";
const size_t PREFIX_LENGTH = 4;

rocksdb::CompressionType compression_type(const std::string& compression)
{
if (compression == "SNAPPY") {
return rocksdb::kSnappyCompression;
} else if (compression == "LZ4") {
return rocksdb::kLZ4Compression;
}

LOG(FATAL) << "compression type not recognized: " << compression;
return rocksdb::kNoCompression;
}

std::shared_ptr<rocksdb::Cache> OlapMeta::_s_block_cache;

OlapMeta::OlapMeta(const std::string& root_path) : _root_path(root_path), _db(nullptr) {}

Expand All @@ -61,19 +73,34 @@ OlapMeta::~OlapMeta() {
OLAPStatus OlapMeta::init() {
// init db
DBOptions options;
options.IncreaseParallelism();
int total_threads = config::rocksdb_thread_count;
if (total_threads == -1) {
total_threads = CpuInfo::num_cores();
}
options.IncreaseParallelism(total_threads);
options.create_if_missing = true;
options.create_missing_column_families = true;
std::string db_path = _root_path + META_POSTFIX;
std::vector<ColumnFamilyDescriptor> column_families;
// default column family is required
column_families.emplace_back(DEFAULT_COLUMN_FAMILY, ColumnFamilyOptions());
// Not used, but we have to open the default column family in order to open all column families
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, ColumnFamilyOptions());
// Deprecated, but we have to open it to keep compatible
column_families.emplace_back(DORIS_COLUMN_FAMILY, ColumnFamilyOptions());

// meta column family add prefix extractor to improve performance and ensure correctness
ColumnFamilyOptions meta_column_family;
meta_column_family.prefix_extractor.reset(NewFixedPrefixTransform(PREFIX_LENGTH));
column_families.emplace_back(META_COLUMN_FAMILY, meta_column_family);
ColumnFamilyOptions column_family_options;
column_family_options.compression = compression_type(config::rocksdb_compression_type);
column_family_options.memtable_prefix_bloom_size_ratio = 0.02;
{
rocksdb::BlockBasedTableOptions block_based_options;
block_based_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10));
static std::once_flag flag;
std::call_once(flag, [&]() {
_s_block_cache = rocksdb::NewLRUCache(config::rocksdb_block_cache_mb * 1024 * 1024);
});
block_based_options.block_cache = _s_block_cache;
column_family_options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
}
column_families.emplace_back(META_COLUMN_FAMILY, column_family_options);
rocksdb::Status s = DB::Open(options, db_path, column_families, &_handles, &_db);
if (!s.ok() || _db == nullptr) {
LOG(WARNING) << "rocks db open failed, reason:" << s.ToString();
Expand Down Expand Up @@ -195,25 +222,4 @@ std::string OlapMeta::get_root_path() {
return _root_path;
}

OLAPStatus OlapMeta::get_tablet_convert_finished(bool& flag) {
// get is_header_converted flag
std::string value;
std::string key = TABLET_CONVERT_FINISHED;
OLAPStatus s = get(DEFAULT_COLUMN_FAMILY_INDEX, key, &value);
if (s == OLAP_ERR_META_KEY_NOT_FOUND || value == "false") {
flag = false;
} else if (value == "true") {
flag = true;
} else {
LOG(WARNING) << "invalid _is_header_converted. _is_header_converted=" << value;
return OLAP_ERR_HEADER_INVALID_FLAG;
}
return OLAP_SUCCESS;
}

OLAPStatus OlapMeta::set_tablet_convert_finished() {
OLAPStatus s = put(DEFAULT_COLUMN_FAMILY_INDEX, TABLET_CONVERT_FINISHED, CONVERTED_FLAG);
return s;
}

} // namespace doris
14 changes: 6 additions & 8 deletions be/src/olap/olap_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#ifndef DORIS_BE_SRC_OLAP_OLAP_OLAP_META_H
#define DORIS_BE_SRC_OLAP_OLAP_OLAP_META_H
#pragma once

#include <functional>
#include <map>
Expand Down Expand Up @@ -48,16 +47,15 @@ class OlapMeta {

std::string get_root_path();

OLAPStatus get_tablet_convert_finished(bool& flag);

OLAPStatus set_tablet_convert_finished();

private:
// All rocksdb instances on this server will share the same block cache.
// It's convenient to control the total memory used by this server, and the LRU
// algorithm used by the block cache can be more efficient in this way.
static std::shared_ptr<rocksdb::Cache> _s_block_cache;

std::string _root_path;
rocksdb::DB* _db;
std::vector<rocksdb::ColumnFamilyHandle*> _handles;
};

} // namespace doris

#endif // DORIS_BE_SRC_OLAP_OLAP_OLAP_META_H
2 changes: 0 additions & 2 deletions be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

namespace doris {

const std::string ROWSET_PREFIX = "rst_";

bool RowsetMetaManager::check_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
const RowsetId& rowset_id) {
std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
Expand Down
25 changes: 4 additions & 21 deletions be/src/olap/tablet_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,10 @@ OLAPStatus TabletMetaManager::get_json_meta(DataDir* store, TTabletId tablet_id,
return OLAP_SUCCESS;
}

// TODO(ygl):
// 1. if term > 0 then save to remote meta store first using term
// 2. save to local meta store
OLAPStatus TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash,
TabletMetaSharedPtr tablet_meta, const string& header_prefix) {
std::stringstream key_stream;
key_stream << header_prefix << tablet_id << "_" << schema_hash;
std::string key = key_stream.str();
std::string value;
tablet_meta->serialize(&value);
OlapMeta* meta = store->get_meta();
LOG(INFO) << "save tablet meta"
<< ", key:" << key << ", meta length:" << value.length();
return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
}

OLAPStatus TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash,
const std::string& meta_binary, const string& header_prefix) {
const std::string& meta_binary) {
std::stringstream key_stream;
key_stream << header_prefix << tablet_id << "_" << schema_hash;
key_stream << HEADER_PREFIX << tablet_id << "_" << schema_hash;
std::string key = key_stream.str();
VLOG_NOTICE << "save tablet meta to meta store: key = " << key;
OlapMeta* meta = store->get_meta();
Expand All @@ -112,10 +96,9 @@ OLAPStatus TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaH
// TODO(ygl):
// 1. remove load data first
// 2. remove from load meta store using term if term > 0
OLAPStatus TabletMetaManager::remove(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash,
const string& header_prefix) {
OLAPStatus TabletMetaManager::remove(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash) {
std::stringstream key_stream;
key_stream << header_prefix << tablet_id << "_" << schema_hash;
key_stream << HEADER_PREFIX << tablet_id << "_" << schema_hash;
std::string key = key_stream.str();
OlapMeta* meta = store->get_meta();
LOG(INFO) << "start to remove tablet_meta, key:" << key;
Expand Down
Loading

0 comments on commit e864eb8

Please sign in to comment.