Skip to content

Commit

Permalink
Make CacheWithSecondaryAdapter reservation accounting more robust (#1…
Browse files Browse the repository at this point in the history
…2059)

Summary:
`CacheWithSecondaryAdapter` can distribute placeholder reservations across the primary and secondary caches. The current implementation of the accounting is quite complicated in order to avoid using a mutex. This may cause the accounting to be slightly off after changes to the cache capacity and ratio, resulting in assertion failures. There's also a bug in the unlikely event that the total reservation exceeds the cache capacity. Furthermore, the current implementation is difficult to reason about.

This PR simplifies it by doing the accounting while holding a mutex. The reservations are processed in 1MB chunks in order to avoid taking a lock too frequently. As a side effect, this also removes the restriction of not allowing to increase the compressed secondary cache capacity after decreasing it to 0.

Pull Request resolved: #12059

Test Plan: Existing unit tests, and a new test for capacity increase from 0

Reviewed By: pdillinger

Differential Revision: D51278686

Pulled By: anand1976

fbshipit-source-id: 7e1ad2c50694772997072dd59cab35c93c12ba4f
  • Loading branch information
anand1976 authored and facebook-github-bot committed Nov 15, 2023
1 parent a660e07 commit 2222cae
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 96 deletions.
68 changes: 44 additions & 24 deletions cache/compressed_secondary_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1224,13 +1224,11 @@ TEST_P(CompressedSecCacheTestWithTiered, DynamicUpdate) {
ASSERT_OK(sec_cache->GetCapacity(sec_capacity));
ASSERT_EQ(sec_capacity, 0);

ASSERT_NOK(UpdateTieredCache(tiered_cache, -1, 0.3));
// Only check usage for LRU cache. HCC shows a 64KB usage for some reason
if (std::get<0>(GetParam()) == PrimaryCacheType::kCacheTypeLRU) {
ASSERT_EQ(GetCache()->GetUsage(), 0);
}
ASSERT_OK(UpdateTieredCache(tiered_cache, -1, 0.3));
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (30 << 20),
GetPercent(30 << 20, 1));
ASSERT_OK(sec_cache->GetCapacity(sec_capacity));
ASSERT_EQ(sec_capacity, 0);
ASSERT_EQ(sec_capacity, (30 << 20));
}

TEST_P(CompressedSecCacheTestWithTiered, DynamicUpdateWithReservation) {
Expand Down Expand Up @@ -1316,28 +1314,50 @@ TEST_P(CompressedSecCacheTestWithTiered, DynamicUpdateWithReservation) {
ASSERT_OK(sec_cache->GetCapacity(sec_capacity));
ASSERT_EQ(sec_capacity, 0);

ASSERT_OK(UpdateTieredCache(tiered_cache, -1, 0.3));
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (37 << 20),
GetPercent(37 << 20, 1));
EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (3 << 20),
GetPercent(3 << 20, 1));
ASSERT_OK(sec_cache->GetCapacity(sec_capacity));
ASSERT_EQ(sec_capacity, 30 << 20);

ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(0));
}

TEST_P(CompressedSecCacheTestWithTiered,
DynamicUpdateWithReservationUnderflow) {
TEST_P(CompressedSecCacheTestWithTiered, ReservationOverCapacity) {
CompressedSecondaryCache* sec_cache =
reinterpret_cast<CompressedSecondaryCache*>(GetSecondaryCache());
std::shared_ptr<Cache> tiered_cache = GetTieredCache();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"CacheWithSecondaryAdapter::Release:ChargeSecCache1",
"CacheWithSecondaryAdapter::UpdateCacheReservationRatio:Begin"},
{"CacheWithSecondaryAdapter::UpdateCacheReservationRatio:End",
"CacheWithSecondaryAdapter::Release:ChargeSecCache2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

port::Thread reserve_release_thread([&]() {
EXPECT_EQ(cache_res_mgr()->UpdateCacheReservation(50), Status::OK());
EXPECT_EQ(cache_res_mgr()->UpdateCacheReservation(0), Status::OK());
});
ASSERT_OK(UpdateTieredCache(tiered_cache, 100 << 20, 0.01));
reserve_release_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

ASSERT_OK(UpdateTieredCache(tiered_cache, 100 << 20, 0.3));

ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(110 << 20));
// Use EXPECT_PRED3 instead of EXPECT_NEAR to void too many size_t to
// double explicit casts
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (110 << 20),
GetPercent(110 << 20, 1));
EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (30 << 20),
GetPercent(30 << 20, 1));
size_t sec_capacity;
ASSERT_OK(sec_cache->GetCapacity(sec_capacity));
ASSERT_EQ(sec_capacity, (30 << 20));

ASSERT_OK(UpdateTieredCache(tiered_cache, -1, 0.39));
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (110 << 20),
GetPercent(110 << 20, 1));
EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (39 << 20),
GetPercent(39 << 20, 1));
ASSERT_OK(sec_cache->GetCapacity(sec_capacity));
ASSERT_EQ(sec_capacity, (39 << 20));

ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(90 << 20));
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (94 << 20),
GetPercent(94 << 20, 1));
EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (35 << 20),
GetPercent(35 << 20, 1));
ASSERT_OK(sec_cache->GetCapacity(sec_capacity));
ASSERT_EQ(sec_capacity, (39 << 20));

ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(0));
}

INSTANTIATE_TEST_CASE_P(
Expand Down
145 changes: 79 additions & 66 deletions cache/secondary_cache_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ CacheWithSecondaryAdapter::CacheWithSecondaryAdapter(
: CacheWrapper(std::move(target)),
secondary_cache_(std::move(secondary_cache)),
adm_policy_(adm_policy),
distribute_cache_res_(distribute_cache_res) {
distribute_cache_res_(distribute_cache_res),
placeholder_usage_(0),
reserved_usage_(0),
sec_reserved_(0) {
target_->SetEvictionCallback(
[this](const Slice& key, Handle* handle, bool was_hit) {
return EvictionHandler(key, handle, was_hit);
Expand All @@ -103,8 +106,7 @@ CacheWithSecondaryAdapter::CacheWithSecondaryAdapter(
// secondary cache is freed from the reservation.
s = pri_cache_res_->UpdateCacheReservation(sec_capacity);
assert(s.ok());
sec_cache_res_ratio_.store((double)sec_capacity / target_->GetCapacity(),
std::memory_order_relaxed);
sec_cache_res_ratio_ = (double)sec_capacity / target_->GetCapacity();
}
}

Expand All @@ -113,7 +115,7 @@ CacheWithSecondaryAdapter::~CacheWithSecondaryAdapter() {
// use after free
target_->SetEvictionCallback({});
#ifndef NDEBUG
if (distribute_cache_res_ && !ratio_changed_) {
if (distribute_cache_res_) {
size_t sec_capacity = 0;
Status s = secondary_cache_->GetCapacity(sec_capacity);
assert(s.ok());
Expand Down Expand Up @@ -236,13 +238,31 @@ Status CacheWithSecondaryAdapter::Insert(const Slice& key, ObjectPtr value,
const Slice& compressed_value,
CompressionType type) {
Status s = target_->Insert(key, value, helper, charge, handle, priority);
if (s.ok() && value == nullptr && distribute_cache_res_) {
size_t sec_charge = static_cast<size_t>(
charge * (sec_cache_res_ratio_.load(std::memory_order_relaxed)));
s = secondary_cache_->Deflate(sec_charge);
assert(s.ok());
s = pri_cache_res_->UpdateCacheReservation(sec_charge, /*increase=*/false);
assert(s.ok());
if (s.ok() && value == nullptr && distribute_cache_res_ && handle) {
charge = target_->GetCharge(*handle);

MutexLock l(&cache_res_mutex_);
placeholder_usage_ += charge;
// Check if total placeholder reservation is more than the overall
// cache capacity. If it is, then we don't try to charge the
// secondary cache because we don't want to overcharge it (beyond
// its capacity).
// In order to make this a bit more lightweight, we also check if
// the difference between placeholder_usage_ and reserved_usage_ is
// atleast kReservationChunkSize and avoid any adjustments if not.
if ((placeholder_usage_ <= target_->GetCapacity()) &&
((placeholder_usage_ - reserved_usage_) >= kReservationChunkSize)) {
reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1);
size_t new_sec_reserved =
static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_);
size_t sec_charge = new_sec_reserved - sec_reserved_;
s = secondary_cache_->Deflate(sec_charge);
assert(s.ok());
s = pri_cache_res_->UpdateCacheReservation(sec_charge,
/*increase=*/false);
assert(s.ok());
sec_reserved_ += sec_charge;
}
}
// Warm up the secondary cache with the compressed block. The secondary
// cache may choose to ignore it based on the admission policy.
Expand Down Expand Up @@ -287,14 +307,27 @@ bool CacheWithSecondaryAdapter::Release(Handle* handle,
ObjectPtr v = target_->Value(handle);
if (v == nullptr && distribute_cache_res_) {
size_t charge = target_->GetCharge(handle);
size_t sec_charge = static_cast<size_t>(
charge * (sec_cache_res_ratio_.load(std::memory_order_relaxed)));
TEST_SYNC_POINT("CacheWithSecondaryAdapter::Release:ChargeSecCache1");
TEST_SYNC_POINT("CacheWithSecondaryAdapter::Release:ChargeSecCache2");
Status s = secondary_cache_->Inflate(sec_charge);
assert(s.ok());
s = pri_cache_res_->UpdateCacheReservation(sec_charge, /*increase=*/true);
assert(s.ok());

MutexLock l(&cache_res_mutex_);
placeholder_usage_ -= charge;
// Check if total placeholder reservation is more than the overall
// cache capacity. If it is, then we do nothing as reserved_usage_ must
// be already maxed out
if ((placeholder_usage_ <= target_->GetCapacity()) &&
(placeholder_usage_ < reserved_usage_)) {
// Adjust reserved_usage_ in chunks of kReservationChunkSize, so
// we don't hit this slow path too often.
reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1);
size_t new_sec_reserved =
static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_);
size_t sec_charge = sec_reserved_ - new_sec_reserved;
Status s = secondary_cache_->Inflate(sec_charge);
assert(s.ok());
s = pri_cache_res_->UpdateCacheReservation(sec_charge,
/*increase=*/true);
assert(s.ok());
sec_reserved_ -= sec_charge;
}
}
}
return target_->Release(handle, erase_if_last_ref);
Expand Down Expand Up @@ -441,13 +474,11 @@ const char* CacheWithSecondaryAdapter::Name() const {
// where the new capacity < total cache reservations.
void CacheWithSecondaryAdapter::SetCapacity(size_t capacity) {
size_t sec_capacity = static_cast<size_t>(
capacity * (distribute_cache_res_
? sec_cache_res_ratio_.load(std::memory_order_relaxed)
: 0.0));
capacity * (distribute_cache_res_ ? sec_cache_res_ratio_ : 0.0));
size_t old_sec_capacity = 0;

if (distribute_cache_res_) {
MutexLock m(&mutex_);
MutexLock m(&cache_res_mutex_);

Status s = secondary_cache_->GetCapacity(old_sec_capacity);
if (!s.ok()) {
Expand All @@ -462,9 +493,17 @@ void CacheWithSecondaryAdapter::SetCapacity(size_t capacity) {
// 3. Decrease the primary cache capacity to the total budget
s = secondary_cache_->SetCapacity(sec_capacity);
if (s.ok()) {
if (placeholder_usage_ > capacity) {
// Adjust reserved_usage_ down
reserved_usage_ = capacity & ~(kReservationChunkSize - 1);
}
size_t new_sec_reserved =
static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_);
s = pri_cache_res_->UpdateCacheReservation(
old_sec_capacity - sec_capacity,
(old_sec_capacity - sec_capacity) -
(sec_reserved_ - new_sec_reserved),
/*increase=*/false);
sec_reserved_ = new_sec_reserved;
assert(s.ok());
target_->SetCapacity(capacity);
}
Expand Down Expand Up @@ -498,7 +537,7 @@ Status CacheWithSecondaryAdapter::GetSecondaryCachePinnedUsage(
size_t& size) const {
Status s;
if (distribute_cache_res_) {
MutexLock m(&mutex_);
MutexLock m(&cache_res_mutex_);
size_t capacity = 0;
s = secondary_cache_->GetCapacity(capacity);
if (s.ok()) {
Expand Down Expand Up @@ -526,12 +565,11 @@ Status CacheWithSecondaryAdapter::GetSecondaryCachePinnedUsage(
// in the future.
Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio(
double compressed_secondary_ratio) {
if (!distribute_cache_res_ ||
sec_cache_res_ratio_.load(std::memory_order_relaxed) == 0.0) {
if (!distribute_cache_res_) {
return Status::NotSupported();
}

MutexLock m(&mutex_);
MutexLock m(&cache_res_mutex_);
size_t pri_capacity = target_->GetCapacity();
size_t sec_capacity =
static_cast<size_t>(pri_capacity * compressed_secondary_ratio);
Expand All @@ -541,51 +579,31 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio(
return s;
}

TEST_SYNC_POINT(
"CacheWithSecondaryAdapter::UpdateCacheReservationRatio:Begin");

// There's a possible race condition here. Since the read of pri_cache_res_
// memory used (secondary cache usage charged to primary cache), and the
// change to sec_cache_res_ratio_ are not guarded by a mutex, its possible
// that an Insert/Release in another thread might decrease/increase the
// pri_cache_res_ reservation by the wrong amount. This should not be a
// problem because updating the sec/pri ratio is a rare operation, and
// the worst that can happen is we may over/under charge the secondary
// cache usage by a little bit. But we do need to protect against
// underflow of old_sec_reserved.
// TODO: Make the accounting more accurate by tracking the total memory
// reservation on the primary cache. This will also allow us to remove
// the restriction of not being able to change the sec/pri ratio from
// 0.0 to higher.
size_t sec_charge_to_pri = pri_cache_res_->GetTotalMemoryUsed();
size_t old_sec_reserved = (old_sec_capacity > sec_charge_to_pri)
? (old_sec_capacity - sec_charge_to_pri)
: 0;
// Calculate the new secondary cache reservation
size_t sec_reserved = static_cast<size_t>(
old_sec_reserved *
(double)(compressed_secondary_ratio /
sec_cache_res_ratio_.load(std::memory_order_relaxed)));
sec_cache_res_ratio_.store(compressed_secondary_ratio,
std::memory_order_relaxed);
// reserved_usage_ will never be > the cache capacity, so we don't
// have to worry about adjusting it here.
sec_cache_res_ratio_ = compressed_secondary_ratio;
size_t new_sec_reserved =
static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_);
if (sec_capacity > old_sec_capacity) {
// We're increasing the ratio, thus ending up with a larger secondary
// cache and a smaller usable primary cache capacity. Similar to
// SetCapacity(), we try to avoid a temporary increase in total usage
// beyond teh configured capacity -
// beyond the configured capacity -
// 1. A higher secondary cache ratio means it gets a higher share of
// cache reservations. So first account for that by deflating the
// secondary cache
// 2. Increase pri_cache_res_ reservation to reflect the new secondary
// cache utilization (increase in capacity - increase in share of cache
// reservation)
// 3. Increase secondary cache capacity
s = secondary_cache_->Deflate(sec_reserved - old_sec_reserved);
s = secondary_cache_->Deflate(new_sec_reserved - sec_reserved_);
assert(s.ok());
s = pri_cache_res_->UpdateCacheReservation(
(sec_capacity - old_sec_capacity) - (sec_reserved - old_sec_reserved),
(sec_capacity - old_sec_capacity) - (new_sec_reserved - sec_reserved_),
/*increase=*/true);
assert(s.ok());
sec_reserved_ = new_sec_reserved;
s = secondary_cache_->SetCapacity(sec_capacity);
assert(s.ok());
} else {
Expand All @@ -599,21 +617,16 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio(
s = secondary_cache_->SetCapacity(sec_capacity);
if (s.ok()) {
s = pri_cache_res_->UpdateCacheReservation(
(old_sec_capacity - sec_capacity) - (old_sec_reserved - sec_reserved),
(old_sec_capacity - sec_capacity) -
(sec_reserved_ - new_sec_reserved),
/*increase=*/false);
assert(s.ok());
s = secondary_cache_->Inflate(old_sec_reserved - sec_reserved);
s = secondary_cache_->Inflate(sec_reserved_ - new_sec_reserved);
assert(s.ok());
sec_reserved_ = new_sec_reserved;
}
}

TEST_SYNC_POINT("CacheWithSecondaryAdapter::UpdateCacheReservationRatio:End");
#ifndef NDEBUG
// As mentioned in the function comments, we may accumulate some erros when
// the ratio is changed. We set a flag here which disables some assertions
// in the destructor
ratio_changed_ = true;
#endif
return s;
}

Expand Down
19 changes: 14 additions & 5 deletions cache/secondary_cache_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class CacheWithSecondaryAdapter : public CacheWrapper {
SecondaryCache* TEST_GetSecondaryCache() { return secondary_cache_.get(); }

private:
static constexpr size_t kReservationChunkSize = 1 << 20;

bool EvictionHandler(const Slice& key, Handle* handle, bool was_hit);

void StartAsyncLookupOnMySecondary(AsyncLookupHandle& async_handle);
Expand All @@ -84,11 +86,18 @@ class CacheWithSecondaryAdapter : public CacheWrapper {
std::shared_ptr<ConcurrentCacheReservationManager> pri_cache_res_;
// Fraction of a cache memory reservation to be assigned to the secondary
// cache
std::atomic<double> sec_cache_res_ratio_;
mutable port::Mutex mutex_;
#ifndef NDEBUG
bool ratio_changed_ = false;
#endif
double sec_cache_res_ratio_;
// Mutex for use when managing cache memory reservations. Should not be used
// for other purposes, as it may risk causing deadlocks.
mutable port::Mutex cache_res_mutex_;
// Total memory reserved by placeholder entriesin the cache
size_t placeholder_usage_;
// Total placeholoder memory charged to both the primary and secondary
// caches. Will be <= placeholder_usage_.
size_t reserved_usage_;
// Amount of memory reserved in the secondary cache. This should be
// reserved_usage_ * sec_cache_res_ratio_ in steady state.
size_t sec_reserved_;
};

} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion db_stress_tool/db_stress_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ void CompressedCacheSetCapacityThread(void* v) {
s.ToString().c_str());
}
} else if (FLAGS_compressed_secondary_cache_ratio > 0.0) {
if (thread->rand.OneIn(2)) {
if (thread->rand.OneIn(2)) { // if (thread->rand.OneIn(2)) {
size_t capacity = block_cache->GetCapacity();
size_t adjustment;
if (FLAGS_use_write_buffer_manager && FLAGS_db_write_buffer_size > 0) {
Expand Down

0 comments on commit 2222cae

Please sign in to comment.