Skip to content

Commit

Permalink
Support compressed and local flash secondary cache stacking (#11812)
Browse files Browse the repository at this point in the history
Summary:
This PR implements support for a three tier cache - primary block cache, compressed secondary cache, and a nvm (local flash) secondary cache. This allows more effective utilization of the nvm cache, and minimizes the number of reads from local flash by caching compressed blocks in the compressed secondary cache.

The basic design is as follows -
1. A new secondary cache implementation, ```TieredSecondaryCache```, is introduced. It keeps the compressed and nvm secondary caches and manages the movement of blocks between them and the primary block cache. To setup a three tier cache, we allocate a ```CacheWithSecondaryAdapter```, with a ```TieredSecondaryCache``` instance as the secondary cache.
2. The table reader passes both the uncompressed and compressed block to ```FullTypedCacheInterface::InsertFull```, allowing the block cache to optionally store the compressed block.
3. When there's a miss, the block object is constructed and inserted in the primary cache, and the compressed block is inserted into the nvm cache by calling ```InsertSaved```. This avoids the overhead of recompressing the block, as well as avoiding putting more memory pressure on the compressed secondary cache.
4. When there's a hit in the nvm cache, we attempt to insert the block in the compressed secondary cache and the primary cache, subject to the admission policy of those caches (i.e admit on second access). Blocks/items evicted from any tier are simply discarded.

We can easily implement additional admission policies if desired.

Todo (In a subsequent PR):
1. Add to db_bench and run benchmarks
2. Add to db_stress

Pull Request resolved: #11812

Reviewed By: pdillinger

Differential Revision: D49461842

Pulled By: anand1976

fbshipit-source-id: b40ac1330ef7cd8c12efa0a3ca75128e602e3a0b
  • Loading branch information
anand1976 authored and facebook-github-bot committed Sep 22, 2023
1 parent b927ba5 commit 269478e
Show file tree
Hide file tree
Showing 49 changed files with 1,527 additions and 238 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ set(SOURCES
cache/secondary_cache.cc
cache/secondary_cache_adapter.cc
cache/sharded_cache.cc
cache/tiered_secondary_cache.cc
db/arena_wrapped_db_iter.cc
db/blob/blob_contents.cc
db/blob/blob_fetcher.cc
Expand Down Expand Up @@ -1263,6 +1264,7 @@ if(WITH_TESTS)
cache/cache_test.cc
cache/compressed_secondary_cache_test.cc
cache/lru_cache_test.cc
cache/tiered_secondary_cache_test.cc
db/blob/blob_counting_iterator_test.cc
db/blob/blob_file_addition_test.cc
db/blob/blob_file_builder_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,9 @@ compressed_secondary_cache_test: $(OBJ_DIR)/cache/compressed_secondary_cache_tes
lru_cache_test: $(OBJ_DIR)/cache/lru_cache_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

tiered_secondary_cache_test: $(OBJ_DIR)/cache/tiered_secondary_cache_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

range_del_aggregator_test: $(OBJ_DIR)/db/range_del_aggregator_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"cache/secondary_cache.cc",
"cache/secondary_cache_adapter.cc",
"cache/sharded_cache.cc",
"cache/tiered_secondary_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_contents.cc",
"db/blob/blob_fetcher.cc",
Expand Down Expand Up @@ -5475,6 +5476,12 @@ cpp_unittest_wrapper(name="tiered_compaction_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="tiered_secondary_cache_test",
srcs=["cache/tiered_secondary_cache_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="timer_queue_test",
srcs=["util/timer_queue_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
35 changes: 35 additions & 0 deletions cache/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,41 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionTypeFlags::kMutable}},
};

namespace {
static void NoopDelete(Cache::ObjectPtr /*obj*/,
MemoryAllocator* /*allocator*/) {
assert(false);
}

static size_t SliceSize(Cache::ObjectPtr obj) {
return static_cast<Slice*>(obj)->size();
}

static Status SliceSaveTo(Cache::ObjectPtr from_obj, size_t from_offset,
size_t length, char* out) {
const Slice& slice = *static_cast<Slice*>(from_obj);
std::memcpy(out, slice.data() + from_offset, length);
return Status::OK();
}

static Status NoopCreate(const Slice& /*data*/, CompressionType /*type*/,
CacheTier /*source*/, Cache::CreateContext* /*ctx*/,
MemoryAllocator* /*allocator*/,
Cache::ObjectPtr* /*out_obj*/,
size_t* /*out_charge*/) {
assert(false);
return Status::NotSupported();
}

static Cache::CacheItemHelper kBasicCacheItemHelper(CacheEntryRole::kMisc,
&NoopDelete);
} // namespace

const Cache::CacheItemHelper kSliceCacheItemHelper{
CacheEntryRole::kMisc, &NoopDelete, &SliceSize,
&SliceSaveTo, &NoopCreate, &kBasicCacheItemHelper,
};

Status SecondaryCache::CreateFromString(
const ConfigOptions& config_options, const std::string& value,
std::shared_ptr<SecondaryCache>* result) {
Expand Down
3 changes: 2 additions & 1 deletion cache/cache_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ Status SaveToFn(Cache::ObjectPtr from_obj, size_t /*from_offset*/,
return Status::OK();
}

Status CreateFn(const Slice& data, Cache::CreateContext* /*context*/,
Status CreateFn(const Slice& data, CompressionType /*type*/,
CacheTier /*source*/, Cache::CreateContext* /*context*/,
MemoryAllocator* /*allocator*/, Cache::ObjectPtr* out_obj,
size_t* out_charge) {
*out_obj = new char[data.size()];
Expand Down
3 changes: 2 additions & 1 deletion cache/cache_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ Status WarmInCache(Cache* cache, const Slice& key, const Slice& saved,
assert(helper->create_cb);
Cache::ObjectPtr value;
size_t charge;
Status st = helper->create_cb(saved, create_context,
Status st = helper->create_cb(saved, CompressionType::kNoCompression,
CacheTier::kVolatileTier, create_context,
cache->memory_allocator(), &value, &charge);
if (st.ok()) {
st =
Expand Down
6 changes: 4 additions & 2 deletions cache/charged_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ ChargedCache::ChargedCache(std::shared_ptr<Cache> cache,

Status ChargedCache::Insert(const Slice& key, ObjectPtr obj,
const CacheItemHelper* helper, size_t charge,
Handle** handle, Priority priority) {
Status s = target_->Insert(key, obj, helper, charge, handle, priority);
Handle** handle, Priority priority,
const Slice& compressed_val, CompressionType type) {
Status s = target_->Insert(key, obj, helper, charge, handle, priority,
compressed_val, type);
if (s.ok()) {
// Insert may cause the cache entry eviction if the cache is full. So we
// directly call the reservation manager to update the total memory used
Expand Down
8 changes: 5 additions & 3 deletions cache/charged_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ class ChargedCache : public CacheWrapper {
ChargedCache(std::shared_ptr<Cache> cache,
std::shared_ptr<Cache> block_cache);

Status Insert(const Slice& key, ObjectPtr obj, const CacheItemHelper* helper,
size_t charge, Handle** handle = nullptr,
Priority priority = Priority::LOW) override;
Status Insert(
const Slice& key, ObjectPtr obj, const CacheItemHelper* helper,
size_t charge, Handle** handle = nullptr,
Priority priority = Priority::LOW, const Slice& compressed_val = Slice(),
CompressionType type = CompressionType::kNoCompression) override;

Cache::Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
CreateContext* create_context,
Expand Down
169 changes: 121 additions & 48 deletions cache/compressed_secondary_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "memory/memory_allocator_impl.h"
#include "monitoring/perf_context_imp.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/string_util.h"

Expand Down Expand Up @@ -54,40 +55,65 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
CacheAllocationPtr* ptr{nullptr};
CacheAllocationPtr merged_value;
size_t handle_value_charge{0};
const char* data_ptr = nullptr;
CacheTier source = CacheTier::kVolatileCompressedTier;
CompressionType type = cache_options_.compression_type;
if (cache_options_.enable_custom_split_merge) {
CacheValueChunk* value_chunk_ptr =
reinterpret_cast<CacheValueChunk*>(handle_value);
merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge);
ptr = &merged_value;
data_ptr = ptr->get();
} else {
uint32_t type_32 = static_cast<uint32_t>(type);
uint32_t source_32 = static_cast<uint32_t>(source);
ptr = reinterpret_cast<CacheAllocationPtr*>(handle_value);
handle_value_charge = cache_->GetCharge(lru_handle);
data_ptr = ptr->get();
data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1,
static_cast<uint32_t*>(&type_32));
type = static_cast<CompressionType>(type_32);
data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1,
static_cast<uint32_t*>(&source_32));
source = static_cast<CacheTier>(source_32);
handle_value_charge -= (data_ptr - ptr->get());
}
MemoryAllocator* allocator = cache_options_.memory_allocator.get();

Status s;
Cache::ObjectPtr value{nullptr};
size_t charge{0};
if (cache_options_.compression_type == kNoCompression ||
cache_options_.do_not_compress_roles.Contains(helper->role)) {
s = helper->create_cb(Slice(ptr->get(), handle_value_charge),
create_context, allocator, &value, &charge);
} else {
UncompressionContext uncompression_context(cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(),
cache_options_.compression_type);

size_t uncompressed_size{0};
CacheAllocationPtr uncompressed = UncompressData(
uncompression_info, (char*)ptr->get(), handle_value_charge,
&uncompressed_size, cache_options_.compress_format_version, allocator);

if (!uncompressed) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
return nullptr;
if (source == CacheTier::kVolatileCompressedTier) {
if (cache_options_.compression_type == kNoCompression ||
cache_options_.do_not_compress_roles.Contains(helper->role)) {
s = helper->create_cb(Slice(data_ptr, handle_value_charge),
kNoCompression, CacheTier::kVolatileTier,
create_context, allocator, &value, &charge);
} else {
UncompressionContext uncompression_context(
cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(),
cache_options_.compression_type);

size_t uncompressed_size{0};
CacheAllocationPtr uncompressed =
UncompressData(uncompression_info, (char*)data_ptr,
handle_value_charge, &uncompressed_size,
cache_options_.compress_format_version, allocator);

if (!uncompressed) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
return nullptr;
}
s = helper->create_cb(Slice(uncompressed.get(), uncompressed_size),
kNoCompression, CacheTier::kVolatileTier,
create_context, allocator, &value, &charge);
}
s = helper->create_cb(Slice(uncompressed.get(), uncompressed_size),
} else {
// The item was not compressed by us. Let the helper create_cb
// uncompress it
s = helper->create_cb(Slice(data_ptr, handle_value_charge), type, source,
create_context, allocator, &value, &charge);
}

Expand All @@ -112,45 +138,56 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
return handle;
}

Status CompressedSecondaryCache::Insert(const Slice& key,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
bool force_insert) {
if (value == nullptr) {
return Status::InvalidArgument();
bool CompressedSecondaryCache::MaybeInsertDummy(const Slice& key) {
auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1);
// Insert a dummy handle if the handle is evicted for the first time.
cache_->Insert(key, /*obj=*/nullptr, internal_helper, /*charge=*/0)
.PermitUncheckedError();
return true;
} else {
cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
}

if (disable_cache_) {
return false;
}

Status CompressedSecondaryCache::InsertInternal(
const Slice& key, Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, CompressionType type,
CacheTier source) {
if (source != CacheTier::kVolatileCompressedTier &&
cache_options_.enable_custom_split_merge) {
// We don't support custom split/merge for the tiered case
return Status::OK();
}

auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
if (!force_insert) {
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1);
// Insert a dummy handle if the handle is evicted for the first time.
return cache_->Insert(key, /*obj=*/nullptr, internal_helper,
/*charge=*/0);
} else {
cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
}
}

size_t size = (*helper->size_cb)(value);
char header[10];
char* payload = header;
payload = EncodeVarint32(payload, static_cast<uint32_t>(type));
payload = EncodeVarint32(payload, static_cast<uint32_t>(source));

size_t header_size = payload - header;
size_t data_size = (*helper->size_cb)(value);
size_t total_size = data_size + header_size;
CacheAllocationPtr ptr =
AllocateBlock(size, cache_options_.memory_allocator.get());
AllocateBlock(total_size, cache_options_.memory_allocator.get());
char* data_ptr = ptr.get() + header_size;

Status s = (*helper->saveto_cb)(value, 0, size, ptr.get());
Status s = (*helper->saveto_cb)(value, 0, data_size, data_ptr);
if (!s.ok()) {
return s;
}
Slice val(ptr.get(), size);
Slice val(data_ptr, data_size);

std::string compressed_val;
if (cache_options_.compression_type != kNoCompression &&
type == kNoCompression &&
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, data_size);
CompressionOptions compression_opts;
CompressionContext compression_context(cache_options_.compression_type,
compression_opts);
Expand All @@ -168,12 +205,14 @@ Status CompressedSecondaryCache::Insert(const Slice& key,
}

val = Slice(compressed_val);
size = compressed_val.size();
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, size);
data_size = compressed_val.size();
total_size = header_size + data_size;
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, data_size);

if (!cache_options_.enable_custom_split_merge) {
ptr = AllocateBlock(size, cache_options_.memory_allocator.get());
memcpy(ptr.get(), compressed_val.data(), size);
ptr = AllocateBlock(total_size, cache_options_.memory_allocator.get());
data_ptr = ptr.get() + header_size;
memcpy(data_ptr, compressed_val.data(), data_size);
}
}

Expand All @@ -184,9 +223,43 @@ Status CompressedSecondaryCache::Insert(const Slice& key,
SplitValueIntoChunks(val, cache_options_.compression_type, charge);
return cache_->Insert(key, value_chunks_head, internal_helper, charge);
} else {
std::memcpy(ptr.get(), header, header_size);
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
return cache_->Insert(key, buf, internal_helper, size);
return cache_->Insert(key, buf, internal_helper, total_size);
}
}

Status CompressedSecondaryCache::Insert(const Slice& key,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
bool force_insert) {
if (value == nullptr) {
return Status::InvalidArgument();
}

if (!force_insert && MaybeInsertDummy(key)) {
return Status::OK();
}

return InsertInternal(key, value, helper, kNoCompression,
CacheTier::kVolatileCompressedTier);
}

Status CompressedSecondaryCache::InsertSaved(
const Slice& key, const Slice& saved, CompressionType type = kNoCompression,
CacheTier source = CacheTier::kVolatileTier) {
if (type == kNoCompression) {
return Status::OK();
}

auto slice_helper = &kSliceCacheItemHelper;
if (MaybeInsertDummy(key)) {
return Status::OK();
}

return InsertInternal(
key, static_cast<Cache::ObjectPtr>(const_cast<Slice*>(&saved)),
slice_helper, type, source);
}

void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
Expand Down
9 changes: 9 additions & 0 deletions cache/compressed_secondary_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class CompressedSecondaryCache : public SecondaryCache {
const Cache::CacheItemHelper* helper,
bool force_insert) override;

Status InsertSaved(const Slice& key, const Slice& saved, CompressionType type,
CacheTier source) override;

std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CacheItemHelper* helper,
Cache::CreateContext* create_context, bool /*wait*/, bool advise_erase,
Expand Down Expand Up @@ -130,6 +133,12 @@ class CompressedSecondaryCache : public SecondaryCache {
CacheAllocationPtr MergeChunksIntoValue(const void* chunks_head,
size_t& charge);

bool MaybeInsertDummy(const Slice& key);

Status InsertInternal(const Slice& key, Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
CompressionType type, CacheTier source);

// TODO: clean up to use cleaner interfaces in typed_cache.h
const Cache::CacheItemHelper* GetHelper(bool enable_custom_split_merge) const;
std::shared_ptr<Cache> cache_;
Expand Down
Loading

0 comments on commit 269478e

Please sign in to comment.