Skip to content

Commit

Permalink
Make discardable ratio and compression type online configurable (#310)
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar authored Feb 29, 2024
1 parent 1144c7a commit 5a9cee8
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 44 deletions.
15 changes: 7 additions & 8 deletions include/titan/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,14 @@ struct ImmutableTitanCFOptions {
ImmutableTitanCFOptions() : ImmutableTitanCFOptions(TitanCFOptions()) {}

explicit ImmutableTitanCFOptions(const TitanCFOptions& opts)
: blob_file_compression(opts.blob_file_compression),
blob_file_target_size(opts.blob_file_target_size),
: blob_file_target_size(opts.blob_file_target_size),
blob_cache(opts.blob_cache),
max_gc_batch_size(opts.max_gc_batch_size),
min_gc_batch_size(opts.min_gc_batch_size),
blob_file_discardable_ratio(opts.blob_file_discardable_ratio),
merge_small_file_threshold(opts.merge_small_file_threshold),
level_merge(opts.level_merge),
skip_value_in_compaction_filter(opts.skip_value_in_compaction_filter) {}

CompressionType blob_file_compression;

uint64_t blob_file_target_size;

std::shared_ptr<Cache> blob_cache;
Expand All @@ -204,8 +200,6 @@ struct ImmutableTitanCFOptions {

uint64_t min_gc_batch_size;

double blob_file_discardable_ratio;

uint64_t merge_small_file_threshold;

bool level_merge;
Expand All @@ -217,10 +211,15 @@ struct MutableTitanCFOptions {
MutableTitanCFOptions() : MutableTitanCFOptions(TitanCFOptions()) {}

explicit MutableTitanCFOptions(const TitanCFOptions& opts)
: blob_run_mode(opts.blob_run_mode), min_blob_size(opts.min_blob_size) {}
: blob_run_mode(opts.blob_run_mode),
min_blob_size(opts.min_blob_size),
blob_file_compression(opts.blob_file_compression),
blob_file_discardable_ratio(opts.blob_file_discardable_ratio) {}

TitanBlobRunMode blob_run_mode;
uint64_t min_blob_size;
CompressionType blob_file_compression;
double blob_file_discardable_ratio;
};

struct TitanOptions : public TitanDBOptions, public TitanCFOptions {
Expand Down
94 changes: 60 additions & 34 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "db/arena_wrapped_db_iter.h"
#include "logging/log_buffer.h"
#include "monitoring/statistics_impl.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "util/autovector.h"
#include "util/mutexlock.h"
Expand Down Expand Up @@ -1099,31 +1100,56 @@ Status TitanDBImpl::ExtractTitanCfOptions(
ColumnFamilyHandle* column_family,
std::unordered_map<std::string, std::string>& new_options,
MutableTitanCFOptions& mutable_cf_options, bool& changed) {
auto option_pos = new_options.find("blob_run_mode");
if (option_pos != new_options.end()) {
const std::string& blob_run_mode_string = option_pos->second;
auto run_mode_mapping_pos =
blob_run_mode_string_map.find(blob_run_mode_string);
if (run_mode_mapping_pos == blob_run_mode_string_map.end()) {
return Status::InvalidArgument("No blob_run_mode defined for " +
blob_run_mode_string);
for (auto it = new_options.begin(); it != new_options.end();) {
const std::string& name = it->first;
const std::string& value = it->second;
if (name == "blob_run_mode") {
auto run_mode_mapping_pos = blob_run_mode_string_map.find(value);
if (run_mode_mapping_pos == blob_run_mode_string_map.end()) {
return Status::InvalidArgument("No blob_run_mode defined for " + value);
} else {
mutable_cf_options.blob_run_mode = run_mode_mapping_pos->second;
TITAN_LOG_INFO(db_options_.info_log, "[%s] Set blob_run_mode: %s",
column_family->GetName().c_str(), value.c_str());
}
it = new_options.erase(it);
changed = true;
} else if (name == "min_blob_size") {
uint64_t min_blob_size = ParseUint64(value);
mutable_cf_options.min_blob_size = min_blob_size;
TITAN_LOG_INFO(db_options_.info_log, "[%s] Set min_blob_size: %" PRIu64,
column_family->GetName().c_str(), min_blob_size);
it = new_options.erase(it);
changed = true;
} else if (name == "blob_file_compression") {
auto compression_type_mapping_pos =
compression_type_string_map.find(value);
if (compression_type_mapping_pos == compression_type_string_map.end()) {
return Status::InvalidArgument("No compression type defined for " +
value);
} else {
mutable_cf_options.blob_file_compression =
compression_type_mapping_pos->second;
TITAN_LOG_INFO(db_options_.info_log,
"[%s] Set blob_file_compression: %s",
column_family->GetName().c_str(), value.c_str());
}
it = new_options.erase(it);
changed = true;
} else if (name == "blob_file_discardable_ratio") {
double blob_file_discardable_ratio = ParseDouble(value);
mutable_cf_options.blob_file_discardable_ratio =
blob_file_discardable_ratio;
TITAN_LOG_INFO(
db_options_.info_log, "[%s] Set blob_file_discardable_ratio: %f",
column_family->GetName().c_str(), blob_file_discardable_ratio);
it = new_options.erase(it);
changed = true;
} else {
mutable_cf_options.blob_run_mode = run_mode_mapping_pos->second;
TITAN_LOG_INFO(db_options_.info_log, "[%s] Set blob_run_mode: %s",
column_family->GetName().c_str(),
blob_run_mode_string.c_str());
++it;
}
new_options.erase(option_pos);
changed = true;
}

option_pos = new_options.find("min_blob_size");
if (option_pos != new_options.end()) {
uint64_t min_blob_size = ParseUint64(option_pos->second);
mutable_cf_options.min_blob_size = min_blob_size;
new_options.erase(option_pos);
changed = true;
}
return Status::OK();
}

Expand Down Expand Up @@ -1282,8 +1308,8 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
".",
flush_job_info.job_id, file->file_number());
} else {
// No need to set live data size here, because it's already set in table
// builder
// No need to set live data size here, because it's already set in
// table builder
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted);
TITAN_LOG_INFO(db_options_.info_log,
"OnFlushCompleted[%d]: output blob file %" PRIu64
Expand Down Expand Up @@ -1321,12 +1347,12 @@ void TitanDBImpl::OnCompactionCompleted(
prop_iter->second, to_add, &blob_file_size_diff);
if (!gc_stats_status.ok()) {
// TODO: Should treat it as background error and make DB read-only.
TITAN_LOG_ERROR(
db_options_.info_log,
"OnCompactionCompleted[%d]: failed to extract GC stats from table "
"property: compaction file: %s, error: %s",
compaction_job_info.job_id, file_name.c_str(),
gc_stats_status.ToString().c_str());
TITAN_LOG_ERROR(db_options_.info_log,
"OnCompactionCompleted[%d]: failed to extract GC "
"stats from table "
"property: compaction file: %s, error: %s",
compaction_job_info.job_id, file_name.c_str(),
gc_stats_status.ToString().c_str());
assert(false);
}
}
Expand Down Expand Up @@ -1378,9 +1404,9 @@ void TitanDBImpl::OnCompactionCompleted(
// flush/compaction is still `kPendingLSM`, while the blob file size
// delta is for the later compaction event, and it is possible that
// delta is negative.
// If the delta is positive, it means the blob file is the output of the
// compaction and the live data size is already in table builder. So
// here only update live data size when negative.
// If the delta is positive, it means the blob file is the output of
// the compaction and the live data size is already in table builder.
// So here only update live data size when negative.
if (delta < 0) {
file->UpdateLiveDataSize(delta);
}
Expand Down Expand Up @@ -1409,8 +1435,8 @@ void TitanDBImpl::OnCompactionCompleted(
}
file->UpdateLiveDataSize(delta);
if (cf_options.level_merge) {
// After level merge, most entries of merged blob files are written to
// new blob files. Delete blob files which have no live data.
// After level merge, most entries of merged blob files are written
// to new blob files. Delete blob files which have no live data.
// Mark last two level blob files to merge in next compaction if
// discardable size reached GC threshold
if (file->NoLiveData()) {
Expand Down
6 changes: 4 additions & 2 deletions src/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts,
const MutableTitanCFOptions& mutable_opts)
: ColumnFamilyOptions(cf_opts),
min_blob_size(mutable_opts.min_blob_size),
blob_file_compression(immutable_opts.blob_file_compression),
blob_file_compression(mutable_opts.blob_file_compression),
blob_file_target_size(immutable_opts.blob_file_target_size),
blob_cache(immutable_opts.blob_cache),
max_gc_batch_size(immutable_opts.max_gc_batch_size),
min_gc_batch_size(immutable_opts.min_gc_batch_size),
blob_file_discardable_ratio(immutable_opts.blob_file_discardable_ratio),
blob_file_discardable_ratio(mutable_opts.blob_file_discardable_ratio),
merge_small_file_threshold(immutable_opts.merge_small_file_threshold),
blob_run_mode(mutable_opts.blob_run_mode),
skip_value_in_compaction_filter(
Expand Down Expand Up @@ -100,6 +100,8 @@ void TitanCFOptions::UpdateMutableOptions(
const MutableTitanCFOptions& new_options) {
blob_run_mode = new_options.blob_run_mode;
min_blob_size = new_options.min_blob_size;
blob_file_compression = new_options.blob_file_compression;
blob_file_discardable_ratio = new_options.blob_file_discardable_ratio;
}

std::map<TitanBlobRunMode, std::string>
Expand Down
117 changes: 117 additions & 0 deletions src/titan_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2120,6 +2120,123 @@ TEST_F(TitanDBTest, OnlineChangeMinBlobSize) {
}
}
}

TEST_F(TitanDBTest, OnlineChangeCompressionType) {
const uint64_t kNumKeys = 100;
std::map<std::string, std::string> data;
Open();

std::unordered_map<std::string, std::string> opts = {
{"blob_file_compression", "kNoCompression"}};
ASSERT_OK(db_->SetOptions(opts));
auto titan_options = db_->GetTitanOptions();
ASSERT_EQ(kNoCompression, titan_options.blob_file_compression);
for (uint64_t k = 1; k <= kNumKeys; k++) {
Put(k, &data);
}
Flush();
auto blob_storage = GetBlobStorage(db_->DefaultColumnFamily());
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage.lock()->ExportBlobFiles(blob_files);
ASSERT_EQ(1, blob_files.size());
auto blob_file = blob_files.begin();
VerifyBlob(blob_file->first, data);
auto first_blob_file_number = blob_file->first;
auto first_blob_file_size = blob_file->second.lock()->file_size();

data.clear();
for (uint64_t k = 1; k <= kNumKeys; k++) {
Put(k, &data);
}
Flush();
blob_files.clear();
blob_storage.lock()->ExportBlobFiles(blob_files);
ASSERT_EQ(2, blob_files.size());
uint64_t second_blob_file_number = 0;
for (const auto& pair : blob_files) {
if (pair.first != first_blob_file_number) {
second_blob_file_number = pair.first;
ASSERT_EQ(first_blob_file_size, pair.second.lock()->file_size());
VerifyBlob(pair.first, data);
}
}

opts = {{"blob_file_compression", "kLZ4Compression"}};
ASSERT_OK(db_->SetOptions(opts));
titan_options = db_->GetTitanOptions();
ASSERT_EQ(kLZ4Compression, titan_options.blob_file_compression);

data.clear();
for (uint64_t k = 1; k <= kNumKeys; k++) {
Put(k, &data);
}
Flush();
blob_files.clear();
blob_storage.lock()->ExportBlobFiles(blob_files);
ASSERT_EQ(3, blob_files.size());
for (const auto& pair : blob_files) {
if (pair.first != first_blob_file_number &&
pair.first != second_blob_file_number) {
VerifyBlob(pair.first, data);
// The third blob file should be smaller, since it uses compression
// algorithm.
ASSERT_GT(first_blob_file_size, pair.second.lock()->file_size());
}
}
}

TEST_F(TitanDBTest, OnlineChangeBlobFileDiscardableRatio) {
options_.min_blob_size = 0;
const uint64_t kNumKeys = 100;
std::map<std::string, std::string> data;
Open();
uint32_t default_cf_id = db_->DefaultColumnFamily()->GetID();

std::unordered_map<std::string, std::string> opts = {
{"blob_file_discardable_ratio", "0.8"}};
ASSERT_OK(db_->SetOptions(opts));
for (uint64_t k = 1; k <= kNumKeys; k++) {
auto key = GenKey(k);
ASSERT_OK(db_->Put(WriteOptions(), key, "v"));
}
Flush();

data.clear();
for (uint64_t k = 1; k <= kNumKeys; k++) {
if (k % 2 == 0) {
Delete(k);
}
}
Flush();
CompactAll();

db_impl_->TEST_StartGC(default_cf_id);
db_impl_->TEST_WaitForBackgroundGC();

auto blob_storage = GetBlobStorage(db_->DefaultColumnFamily());
ASSERT_EQ(blob_storage.lock()->cf_options().blob_file_discardable_ratio, 0.8);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage.lock()->ExportBlobFiles(blob_files);
ASSERT_EQ(1, blob_files.size());
auto blob_file = blob_files.begin();
ASSERT_GT(blob_file->second.lock()->GetDiscardableRatio(), 0.4);

opts = {{"blob_file_discardable_ratio", "0.4"}};
ASSERT_OK(db_->SetOptions(opts));
auto titan_options = db_->GetTitanOptions();
ASSERT_EQ(0.4, titan_options.blob_file_discardable_ratio);

db_impl_->TEST_StartGC(default_cf_id);
db_impl_->TEST_WaitForBackgroundGC();
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());

blob_files.clear();
blob_storage.lock()->ExportBlobFiles(blob_files);
ASSERT_EQ(1, blob_files.size());
blob_file = blob_files.begin();
// The discardable ratio should be updated after GC.
ASSERT_LT(blob_file->second.lock()->GetDiscardableRatio(), 0.4);
}
} // namespace titandb
} // namespace rocksdb

Expand Down

0 comments on commit 5a9cee8

Please sign in to comment.