Skip to content

Commit

Permalink
Optimize reading of account states (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
dungeon-master-666 authored Jan 17, 2025
1 parent 67b6d66 commit 0614122
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 43 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ add_subdirectory(ton-index-clickhouse)
add_subdirectory(ton-integrity-checker)
add_subdirectory(ton-smc-scanner)
add_subdirectory(ton-trace-emulator)
add_subdirectory(celldb-migrate)

if (PGTON)
message("Building pgton")
Expand Down
12 changes: 12 additions & 0 deletions celldb-migrate/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
cmake_minimum_required(VERSION 3.16)

add_executable(celldb-migrate
src/main.cpp
)
target_include_directories(celldb-migrate
PUBLIC src
)
target_compile_features(celldb-migrate PRIVATE cxx_std_17)
target_link_libraries(celldb-migrate tondb-scanner)

install(TARGETS celldb-migrate RUNTIME DESTINATION bin)
231 changes: 231 additions & 0 deletions celldb-migrate/src/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
#include "td/utils/port/signals.h"
#include "td/utils/OptionParser.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/check.h"
#include "td/utils/port/path.h"
#include "td/actor/actor.h"
#include "crypto/vm/cp0.h"
#include "tddb/td/db/RocksDb.h"
#include <iostream>
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "crypto/vm/db/DynamicBagOfCellsDb.h"
#include "crypto/vm/db/CellStorage.h"


static td::Status from_rocksdb(rocksdb::Status status) {
if (status.ok()) {
return td::Status::OK();
}
return td::Status::Error(status.ToString());
}
static td::Slice from_rocksdb(rocksdb::Slice slice) {
return td::Slice(slice.data(), slice.size());
}
static rocksdb::Slice to_rocksdb(td::Slice slice) {
return rocksdb::Slice(slice.data(), slice.size());
}

class MigrateBatchActor: public td::actor::Actor {
td::Bits256 from_;
td::Bits256 to_;
std::shared_ptr<td::RocksDb> db_;
int new_compress_depth_;
td::Promise<td::Unit> promise_;

std::unique_ptr<vm::CellLoader> loader_;
std::shared_ptr<vm::DynamicBagOfCellsDb> boc_;

uint32_t migrated_{0};
public:
MigrateBatchActor(td::Bits256 from, td::Bits256 to, std::shared_ptr<td::RocksDb> db, int new_compress_depth, td::Promise<td::Unit> promise)
: from_(from), to_(to), db_(db), new_compress_depth_(new_compress_depth), promise_(std::move(promise)) {

loader_ = std::make_unique<vm::CellLoader>(db_);
boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_celldb_compress_depth(new_compress_depth_); // probably not necessary in this context
boc_->set_loader(std::make_unique<vm::CellLoader>(db_));
}

void start_up() override {
vm::CellStorer storer{*db_};

std::unique_ptr<rocksdb::Iterator> it;
it.reset(db_->raw_db()->NewIterator({}));
db_->begin_write_batch().ensure();
for (it->Seek(to_rocksdb(from_.as_slice())); it->Valid(); it->Next()) {
auto key = from_rocksdb(it->key());
if (key.size() != 32) {
LOG(WARNING) << "CellDb: skipping key with size " << key.size();
continue;
}
td::Bits256 hash = td::Bits256(td::ConstBitPtr{key.ubegin()});
if (!(hash < to_)) {
break;
}

auto value = from_rocksdb(it->value());
migrate_cell(hash, value, storer);
}
db_->commit_write_batch().ensure();

LOG(INFO) << "Migrating batch from " << from_.to_hex() << " to " << to_.to_hex() << " done. Migrated " << migrated_ << " cells";
promise_.set_value(td::Unit());
stop();
}

td::Status migrate_cell(const td::Bits256& hash, const td::Slice& value, vm::CellStorer& storer) {
auto R = loader_->load(hash.as_slice(), value, true, boc_->as_ext_cell_creator());
if (R.is_error()) {
LOG(WARNING) << "CellDb: failed to load cell: " << R.move_as_error();
return td::Status::OK();
}
if (R.ok().status == vm::CellLoader::LoadResult::NotFound) {
LOG(WARNING) << "CellDb: cell not found";
return td::Status::OK();
}
bool expected_stored_boc = R.ok().cell_->get_depth() == new_compress_depth_;
if (expected_stored_boc != R.ok().stored_boc_) {
++migrated_;
storer.set(R.ok().refcnt(), R.ok().cell_, expected_stored_boc).ensure();
LOG(DEBUG) << "Migrating cell " << hash.to_hex();
}
return td::Status::OK();
}
};

class MigrateCellDBActor: public td::actor::Actor {
std::string db_root_;
int new_compress_depth_;
int max_parallel_batches_;

std::shared_ptr<td::RocksDb> db_;

uint32_t migrated_{0};

td::Bits256 current_;

int cur_parallel_batches_{0};
public:
MigrateCellDBActor(std::string db_root, int new_compress_depth, int max_parallel_batches)
: db_root_(db_root), new_compress_depth_(new_compress_depth), max_parallel_batches_(max_parallel_batches) {
td::RocksDbOptions read_db_options;
read_db_options.use_direct_reads = true;
auto db_r = td::RocksDb::open(db_root_ + "/celldb", std::move(read_db_options));
if (db_r.is_error()) {
LOG(FATAL) << "failed to open db: " << db_r.error();
stop();
return;
}
db_ = std::make_shared<td::RocksDb>(db_r.move_as_ok());

current_ = td::Bits256::zero();
}

void start_up() override {
uint64_t count;
db_->raw_db()->GetIntProperty("rocksdb.estimate-num-keys", &count);
LOG(INFO) << "Estimated total number of keys: " << count;

deploy_batches();
}

void deploy_batches() {
using namespace td::literals;
const auto interval_bi = "0000100000000000000000000000000000000000000000000000000000000000"_rx256;
CHECK(interval_bi.not_null());

while (cur_parallel_batches_ < max_parallel_batches_) {
auto current_bi = td::bits_to_refint(current_.bits(), 256, false);
auto to_bi = current_bi + interval_bi;
if (!to_bi->is_valid()) {
LOG(INFO) << "New to_bi is invalid. Stopping.";
return;
}
td::Bits256 to;
std::string to_hex = to_bi->to_hex_string();
if (to_hex.size() < 64) {
to_hex = std::string(64 - to_hex.size(), '0') + to_hex;
}
if (to.from_hex(to_hex) >= 256) {
LOG(INFO) << "New to_bi is too large. Stopping.";
return;
}

auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), current = current_, to = to](td::Result<td::Unit> R) {
if (R.is_error()) {
LOG(ERROR) << "failed to migrate batch from " << current.to_hex() << " to " << to.to_hex() << ": " << R.error();
} else {
LOG(INFO) << "migrated batch from " << current.to_hex() << " to " << to.to_hex();
}
td::actor::send_closure(SelfId, &MigrateCellDBActor::on_batch_migrated);
});
auto db_clone = std::make_shared<td::RocksDb>(db_->clone());
td::actor::create_actor<MigrateBatchActor>("migrate", current_, to, db_clone, new_compress_depth_, std::move(P)).release();
current_ = to;

cur_parallel_batches_++;
}
}

void on_batch_migrated() {
cur_parallel_batches_--;

deploy_batches();

if (cur_parallel_batches_ == 0) {
LOG(INFO) << "Migrated all batches";
stop();
}
}
};

int main(int argc, char* argv[]) {
SET_VERBOSITY_LEVEL(verbosity_INFO);
td::set_default_failure_signal_handler().ensure();

td::OptionParser p;
std::string db_root;
int new_compress_depth = 0;
p.set_description("Migrate CellDB to another compress db value");
p.add_option('\0', "help", "prints_help", [&]() {
char b[10240];
td::StringBuilder sb(td::MutableSlice{b, 10000});
sb << p;
std::cout << sb.as_cslice().c_str();
std::exit(2);
});
p.add_option('D', "db", "Path to TON DB folder", [&](td::Slice fname) {
db_root = fname.str();
});
p.add_checked_option('\0', "new-celldb-compress-depth", "New value of celldb compress depth", [&](td::Slice fname) {
int v;
try {
v = std::stoi(fname.str());
} catch (...) {
return td::Status::Error("bad value for --new-celldb-compress-depth: not a number");
}
new_compress_depth = v;
return td::Status::OK();
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
std::_Exit(2);
}
if (db_root.empty()) {
LOG(ERROR) << "db path is empty";
std::_Exit(2);
}
if (new_compress_depth <= 0) {
LOG(ERROR) << "new compress depth is invalid";
std::_Exit(2);
}

td::actor::Scheduler scheduler({32});
scheduler.run_in_context(
[&] { td::actor::create_actor<MigrateCellDBActor>("migrate", db_root, new_compress_depth, 32).release(); });
while (scheduler.run(1)) {
}
return 0;
}
7 changes: 6 additions & 1 deletion ton-index-postgres-v2/src/IndexScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,12 @@ void IndexScheduler::seqno_fetched(std::uint32_t mc_seqno, MasterchainBlockDataS
}
td::actor::send_closure(SelfId, &IndexScheduler::seqno_parsed, mc_seqno, R.move_as_ok());
});
td::actor::send_closure(parse_manager_, &ParseManager::parse, mc_seqno, std::move(block_data_state), std::move(P));

td::actor::send_closure(db_scanner_, &DbScanner::get_cell_db_reader,
[SelfId = actor_id(this), parse_manager = parse_manager_, mc_seqno, block_data_state, P = std::move(P)](td::Result<std::shared_ptr<vm::CellDbReader>> cell_db_reader) mutable {
CHECK(cell_db_reader.is_ok());
td::actor::send_closure(parse_manager, &ParseManager::parse, mc_seqno, std::move(block_data_state), cell_db_reader.move_as_ok(), std::move(P));
});
}

void IndexScheduler::seqno_parsed(std::uint32_t mc_seqno, ParsedBlockPtr parsed_block) {
Expand Down
61 changes: 25 additions & 36 deletions tondb-scanner/src/DataParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ td::Status ParseQuery::parse_impl() {
}

// transactions and messages
std::set<td::Bits256> addresses;
TRY_RESULT_ASSIGN(schema_block.transactions, parse_transactions(block_ds.block_data->block_id(), blk, info, extra, addresses));
std::map<td::Bits256, AccountStateShort> account_states_to_get;
TRY_RESULT_ASSIGN(schema_block.transactions, parse_transactions(block_ds.block_data->block_id(), blk, info, extra, account_states_to_get));

// account states
TRY_STATUS(parse_account_states(block_ds.block_state, addresses));
TRY_RESULT(account_states_fast, parse_account_states_new(schema_block.workchain, schema_block.gen_utime, account_states_to_get));

for (auto &acc : account_states_fast) {
result->account_states_.push_back(acc);
}

// config
if (block_ds.block_data->block_id().is_masterchain()) {
Expand Down Expand Up @@ -532,7 +535,7 @@ td::Result<schema::TransactionDescr> ParseQuery::process_transaction_descr(vm::C

td::Result<std::vector<schema::Transaction>> ParseQuery::parse_transactions(const ton::BlockIdExt& blk_id, const block::gen::Block::Record &block,
const block::gen::BlockInfo::Record &info, const block::gen::BlockExtra::Record &extra,
std::set<td::Bits256> &addresses) {
std::map<td::Bits256, AccountStateShort> &account_states) {
std::vector<schema::Transaction> res;
try {
vm::AugmentedDictionary acc_dict{vm::load_cell_slice_ref(extra.account_blocks), 256, block::tlb::aug_ShardAccountBlocks};
Expand Down Expand Up @@ -615,8 +618,8 @@ td::Result<std::vector<schema::Transaction>> ParseQuery::parse_transactions(cons
TRY_RESULT_ASSIGN(schema_tx.description, process_transaction_descr(descr_cs));

res.push_back(schema_tx);

addresses.insert(cur_addr);
account_states[cur_addr] = {schema_tx.account_state_hash_after, schema_tx.lt, schema_tx.hash};
}
}
} catch (vm::VmError err) {
Expand All @@ -625,50 +628,36 @@ td::Result<std::vector<schema::Transaction>> ParseQuery::parse_transactions(cons
return res;
}

td::Status ParseQuery::parse_account_states(const td::Ref<vm::Cell>& block_state_root, std::set<td::Bits256> &addresses) {
auto root = block_state_root;
block::gen::ShardStateUnsplit::Record sstate;
if (!tlb::unpack_cell(block_state_root, sstate)) {
return td::Status::Error("Failed to unpack ShardStateUnsplit");
}
block::gen::ShardIdent::Record shard_id;
if (!tlb::csr_unpack(sstate.shard_id, shard_id)) {
return td::Status::Error("Failed to unpack ShardIdent");
}
vm::AugmentedDictionary accounts_dict{vm::load_cell_slice_ref(sstate.accounts), 256, block::tlb::aug_ShardAccounts};
for (auto &addr : addresses) {
auto shard_account_csr = accounts_dict.lookup(addr);
if (shard_account_csr.is_null()) {
// account is uninitialized after this block
continue;
}
block::gen::ShardAccount::Record acc_info;
if(!tlb::csr_unpack(std::move(shard_account_csr), acc_info)) {
LOG(ERROR) << "Failed to unpack ShardAccount " << addr;
td::Result<std::vector<schema::AccountState>> ParseQuery::parse_account_states_new(ton::WorkchainId workchain_id, uint32_t gen_utime, std::map<td::Bits256, AccountStateShort> &account_states) {
std::vector<schema::AccountState> res;
res.reserve(account_states.size());
for (auto &[addr, state_short] : account_states) {
auto account_cell_r = cell_db_reader_->load_cell(state_short.account_cell_hash.as_slice());
if (account_cell_r.is_error()) {
LOG(ERROR) << "Failed to load account state cell " << state_short.account_cell_hash.to_hex();
continue;
}
int account_tag = block::gen::t_Account.get_tag(vm::load_cell_slice(acc_info.account));
auto account_cell = account_cell_r.move_as_ok();
int account_tag = block::gen::t_Account.get_tag(vm::load_cell_slice(account_cell));
switch (account_tag) {
case block::gen::Account::account_none: {
auto address = block::StdAddress(shard_id.workchain_id, addr);
TRY_RESULT(account, parse_none_account(std::move(acc_info.account), address, sstate.gen_utime, acc_info.last_trans_hash, acc_info.last_trans_lt));
result->account_states_.push_back(account);
auto address = block::StdAddress(workchain_id, addr);
TRY_RESULT(account, parse_none_account(std::move(account_cell), address, gen_utime, state_short.last_transaction_hash, state_short.last_transaction_lt));
res.push_back(account);
break;
}
case block::gen::Account::account: {
TRY_RESULT(account, parse_account(std::move(acc_info.account), sstate.gen_utime, acc_info.last_trans_hash, acc_info.last_trans_lt));
result->account_states_.push_back(account);
TRY_RESULT(account, parse_account(std::move(account_cell), gen_utime, state_short.last_transaction_hash, state_short.last_transaction_lt));
res.push_back(account);
break;
}
default:
return td::Status::Error("Unknown account tag");
}
}
// LOG(DEBUG) << "Parsed " << result->account_states_.size() << " account states";
return td::Status::OK();
return res;
}

// td::Result<schema::AccountState> ParseQuery::parse_shard_account(td::Ref<vm::CellSlice> account_csr)

td::Result<schema::AccountState> ParseQuery::parse_none_account(td::Ref<vm::Cell> account_root, block::StdAddress address, uint32_t gen_utime, td::Bits256 last_trans_hash, uint64_t last_trans_lt) {
block::gen::Account::Record_account_none account_none;
Expand Down
Loading

0 comments on commit 0614122

Please sign in to comment.