Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dnm Record table memory usage 2 #9795

Closed
wants to merge 13 commits into from
73 changes: 57 additions & 16 deletions dbms/src/Storages/KVStore/Decode/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ namespace FailPoints
extern const char force_set_num_regions_for_table[];
} // namespace FailPoints

void RegionTable::InternalRegion::updateRegionCacheBytes(size_t cache_bytes_)
int64_t RegionTable::InternalRegion::updateRegionCacheBytes(size_t cache_bytes_)
{
LOG_INFO(DB::Logger::get(), "!!!!!! updateRegionCacheBytes cache_bytes {} cache_bytes_ {}", cache_bytes, cache_bytes_);
auto diff = static_cast<int64_t>(cache_bytes_) - static_cast<int64_t>(cache_bytes);
cache_bytes = cache_bytes_;
return diff;
}

RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id, const TableID table_id)
Expand All @@ -64,31 +67,53 @@ RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id,
return it->second;
}

RegionTable::Table & RegionTable::debugGetOrCreateTable(const KeyspaceID keyspace_id, const TableID table_id)
{
std::lock_guard lock(mutex);
return getOrCreateTable(keyspace_id, table_id);
}

bool RegionTable::debugHasTable(KeyspaceID keyspace_id, TableID table_id)
{
std::lock_guard lock(mutex);
auto ks_table_id = KeyspaceTableID{keyspace_id, table_id};
auto it = tables.find(ks_table_id);
if (it == tables.end())
{
return false;
}
return true;
}


RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const Region & region)
{
const auto range = region.getRange();
return insertRegion(table, *range, region.id());
return insertRegion(table, *range, region);
}

RegionTable::InternalRegion & RegionTable::insertRegion(
Table & table,
const RegionRangeKeys & region_range_keys,
const RegionID region_id)
const Region & region)
{
auto keyspace_id = region_range_keys.getKeyspaceID();
auto & table_regions = table.regions;
// Insert table mapping.
// todo check if region_range_keys.mapped_table_id == table.table_id ??
auto [it, ok] = table_regions.emplace(region_id, InternalRegion(region_id, region_range_keys.rawKeys()));
auto [it, ok] = table_regions.emplace(region.id(), InternalRegion(region.id(), region_range_keys.rawKeys()));
if (!ok)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}: insert duplicate internal region, region_id={}",
__PRETTY_FUNCTION__,
region_id);
region.id());

// Insert region mapping.
regions[region_id] = KeyspaceTableID{keyspace_id, table.table_id};
regions[region.id()] = KeyspaceTableID{keyspace_id, table.table_id};

LOG_INFO(DB::Logger::get(), "!!!!!! updateTableCacheBytes insert diff {}", region.totalSize());
table.sizeDiff(it->second.updateRegionCacheBytes(region.totalSize()));

return it->second;
}
Expand All @@ -98,6 +123,7 @@ RegionTable::InternalRegion & RegionTable::doGetInternalRegion(KeyspaceTableID k
return tables.find(ks_table_id)->second.regions.find(region_id)->second;
}


RegionTable::InternalRegion & RegionTable::getOrInsertRegion(const Region & region)
{
auto keyspace_id = region.getKeyspaceID();
Expand All @@ -110,6 +136,16 @@ RegionTable::InternalRegion & RegionTable::getOrInsertRegion(const Region & regi
return insertRegion(table, region);
}

void RegionTable::updateTableCacheBytes(const Region & region, int64_t diff)
{
auto keyspace_id = region.getKeyspaceID();
auto table_id = region.getMappedTableID();
auto & table = getOrCreateTable(keyspace_id, table_id);

LOG_INFO(DB::Logger::get(), "!!!!!! updateTableCacheBytes diff {}", diff);
table.sizeDiff(diff);
}

RegionTable::RegionTable(Context & context_)
: context(&context_)
, log(Logger::get())
Expand Down Expand Up @@ -161,7 +197,7 @@ void RegionTable::updateRegion(const Region & region)
{
std::lock_guard lock(mutex);
auto & internal_region = getOrInsertRegion(region);
internal_region.updateRegionCacheBytes(region.dataSize());
updateTableCacheBytes(region, internal_region.updateRegionCacheBytes(region.totalSize()));
}

namespace
Expand Down Expand Up @@ -313,7 +349,7 @@ RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtr & regi

func_update_region([&](InternalRegion & internal_region) -> bool {
internal_region.pause_flush = false;
internal_region.updateRegionCacheBytes(region->dataSize());
updateTableCacheBytes(*region, internal_region.updateRegionCacheBytes(region->totalSize()));
return true;
});

Expand Down Expand Up @@ -383,32 +419,32 @@ void RegionTable::shrinkRegionRange(const Region & region)
std::lock_guard lock(mutex);
auto & internal_region = getOrInsertRegion(region);
internal_region.range_in_table = region.getRange()->rawKeys();
internal_region.updateRegionCacheBytes(region.dataSize());
updateTableCacheBytes(region, internal_region.updateRegionCacheBytes(region.totalSize()));
}

void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeKeys & region_range_keys)
void RegionTable::extendRegionRange(const Region & region, const RegionRangeKeys & region_range_keys)
{
std::lock_guard lock(mutex);

auto keyspace_id = region_range_keys.getKeyspaceID();
auto table_id = region_range_keys.getMappedTableID();
auto new_handle_range = region_range_keys.rawKeys();

if (auto it = regions.find(region_id); it != regions.end())
if (auto it = regions.find(region.id()); it != regions.end())
{
auto ks_tbl_id = KeyspaceTableID{keyspace_id, table_id};
RUNTIME_CHECK_MSG(
ks_tbl_id == it->second,
"{}: table id not match the previous one"
", region_id={} keyspace={} table_id={} old_keyspace={} old_table_id={}",
__PRETTY_FUNCTION__,
region_id,
region.id(),
keyspace_id,
table_id,
it->second.first,
it->second.second);

InternalRegion & internal_region = doGetInternalRegion(ks_tbl_id, region_id);
InternalRegion & internal_region = doGetInternalRegion(ks_tbl_id, region.id());

if (*(internal_region.range_in_table.first) <= *(new_handle_range.first)
&& *(internal_region.range_in_table.second) >= *(new_handle_range.second))
Expand All @@ -418,7 +454,7 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK
"internal region has larger range, keyspace={} table_id={} region_id={}",
keyspace_id,
table_id,
region_id);
region.id());
}
else
{
Expand All @@ -433,8 +469,13 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK
else
{
auto & table = getOrCreateTable(keyspace_id, table_id);
insertRegion(table, region_range_keys, region_id);
LOG_INFO(log, "insert internal region, keyspace={} table_id={} region_id={}", keyspace_id, table_id, region_id);
insertRegion(table, region_range_keys, region);
LOG_INFO(
log,
"insert internal region, keyspace={} table_id={} region_id={}",
keyspace_id,
table_id,
region.id());
}
}

Expand Down
24 changes: 19 additions & 5 deletions dbms/src/Storages/KVStore/Decode/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ class RegionTable : private boost::noncopyable
, range_in_table(range_in_table_)
{}

void updateRegionCacheBytes(size_t);
int64_t updateRegionCacheBytes(size_t);

RegionID region_id;
std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr> range_in_table;
bool pause_flush = false;
bool pause_flush = false; // TODO Can we remove this?

private:
Int64 cache_bytes = 0;
Expand All @@ -94,11 +94,19 @@ class RegionTable : private boost::noncopyable
{
explicit Table(const TableID table_id_)
: table_id(table_id_)
, size(0)
{}
void sizeDiff(Int64 diff) { size.fetch_add(diff); }
Int64 getSize() const { return size; }
TableID table_id;
InternalRegions regions;

private:
std::atomic_int64_t size;
};

void updateTableCacheBytes(const Region &, int64_t);

explicit RegionTable(Context & context_);
void restore();

Expand All @@ -108,7 +116,7 @@ class RegionTable : private boost::noncopyable
void shrinkRegionRange(const Region & region);

/// extend range for possible InternalRegion or add one.
void extendRegionRange(RegionID region_id, const RegionRangeKeys & region_range_keys);
void extendRegionRange(const Region & region, const RegionRangeKeys & region_range_keys);

void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &);

Expand Down Expand Up @@ -150,6 +158,8 @@ class RegionTable : private boost::noncopyable
const LoggerPtr & log);

void clear();
bool debugHasTable(KeyspaceID keyspace_id, TableID table_id);
Table & debugGetOrCreateTable(KeyspaceID keyspace_id, TableID table_id);

public:
// safe ts is maintained by check_leader RPC (https://github.com/tikv/tikv/blob/1ea26a2ac8761af356cc5c0825eb89a0b8fc9749/components/resolved_ts/src/advance.rs#L262),
Expand Down Expand Up @@ -178,11 +188,13 @@ class RegionTable : private boost::noncopyable
private:
friend class MockTiDB;
friend class StorageDeltaMerge;
friend class RegionKVStoreTest_MemoryTracker;


Table & getOrCreateTable(KeyspaceID keyspace_id, TableID table_id);
void removeTable(KeyspaceID keyspace_id, TableID table_id);
Table & getOrCreateTable(KeyspaceID keyspace_id, TableID table_id);
InternalRegion & getOrInsertRegion(const Region & region);
InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, RegionID region_id);
InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, const Region & region);
InternalRegion & insertRegion(Table & table, const Region & region);
InternalRegion & doGetInternalRegion(KeyspaceTableID ks_table_id, RegionID region_id);

Expand All @@ -196,7 +208,9 @@ class RegionTable : private boost::noncopyable

Context * const context;

// For regions and tables.
mutable std::mutex mutex;
// For `safe_ts_map`
mutable std::shared_mutex rw_lock;

LoggerPtr log;
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ void JointThreadInfoJeallocMap::stopThreadAllocInfo()
}
}

void JointThreadInfoJeallocMap::debugClear()
{
std::unique_lock l(memory_allocation_mut);
proxy_map.clear();
storage_map.clear();
}

std::tuple<uint64_t *, uint64_t *> JointThreadInfoJeallocMap::getPtrs()
{
return getAllocDeallocPtr();
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class JointThreadInfoJeallocMap
~JointThreadInfoJeallocMap();
void recordThreadAllocInfo();
void stopThreadAllocInfo();
void debugClear();

// Call `thread.(de)allocatedp` for caller
static std::tuple<uint64_t *, uint64_t *> getPtrs();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ void KVStore::onSnapshot(
const auto range = new_region_wrap->getRange();
auto & region_table = tmt.getRegionTable();
// extend region to make sure data won't be removed.
region_table.extendRegionRange(region_id, *range);
region_table.extendRegionRange(*new_region_wrap, *range);
}

// Register the new Region.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ RegionID RegionRaftCommandDelegate::execCommitMerge(
: source_region_meta_delegate.regionState().getRegion().end_key();

region_table.extendRegionRange(
id(),
*this,
RegionRangeKeys(TiKVKey::copyFrom(new_start_key), TiKVKey::copyFrom(new_end_key)));
}

Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/KVStore/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ RegionPtr Region::splitInto(RegionMeta && meta)
RegionPtr new_region = std::make_shared<Region>(std::move(meta), proxy_helper);

const auto range = new_region->getRange();

data.splitInto(range->comparableKeys(), new_region->data);

return new_region;
Expand Down Expand Up @@ -184,6 +185,12 @@ size_t Region::dataSize() const
return data.dataSize();
}

size_t Region::totalSize() const
{
return data.totalSize() + sizeof(RegionMeta);
}


size_t Region::writeCFCount() const
{
std::shared_lock<std::shared_mutex> lock(mutex);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class Region : public std::enable_shared_from_this<Region>

// Payload size in RegionData, show how much data flows in/out of the Region.
size_t dataSize() const;
// How much memory the Region consumes.
size_t totalSize() const;
size_t writeCFCount() const;
std::string dataInfo() const;

Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ try
{RecordKVFormat::genKey(table_id, 11), RecordKVFormat::genKey(table_id, 20)},
{RecordKVFormat::genKey(table_id, 21), RecordKVFormat::genKey(table_id, 30)}});

auto & region_table = ctx.getTMTContext().getRegionTable();
{
// default
auto region_id = 1;
Expand All @@ -107,6 +108,10 @@ try
ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size());
ASSERT_EQ(kvr1->dataSize(), root_of_kvstore_mem_trackers->get());
ASSERT_EQ(kvr1->dataSize(), kvr1->getData().totalSize());
ASSERT_TRUE(region_table.debugHasTable(NullspaceID, table_id));
auto & table1 = region_table.debugGetOrCreateTable(NullspaceID, table_id);
LOG_INFO(log, "!!!!!! dataSize {}", kvr1->dataSize());
ASSERT_EQ(kvr1->totalSize(), table1.getSize());
}
{
// lock
Expand Down