Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up LargeBocSerializer with bulk cells reading #1533

Open
wants to merge 2 commits into
base: testnet
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crypto/test/test-db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2318,7 +2318,10 @@ TEST(TonDb, LargeBocSerializer) {
std_boc_serialize_to_file_large(dboc->get_cell_db_reader(), root->get_hash(), fd, 31);
fd.close();
auto b = td::read_file_str(path).move_as_ok();
CHECK(a == b);

auto a_cell = vm::deserialize_boc(td::BufferSlice(a));
auto b_cell = vm::deserialize_boc(td::BufferSlice(b));
ASSERT_EQ(a_cell->get_hash(), b_cell->get_hash());
}

TEST(TonDb, DoNotMakeListsPrunned) {
Expand Down
6 changes: 3 additions & 3 deletions crypto/vm/boc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ td::Result<int> BagOfCells::import_cell(td::Ref<vm::Cell> cell, int depth) {
return td::Status::Error("error while importing a cell into a bag of cells: cell is null");
}
if (logger_ptr_) {
TRY_STATUS(logger_ptr_->on_cell_processed());
TRY_STATUS(logger_ptr_->on_cells_processed(1));
}
auto it = cells.find(cell->get_hash());
if (it != cells.end()) {
Expand Down Expand Up @@ -560,7 +560,7 @@ td::Result<std::size_t> BagOfCells::serialize_to_impl(WriterT& writer, int mode)
}
store_offset(fixed_offset);
if (logger_ptr_) {
TRY_STATUS(logger_ptr_->on_cell_processed());
TRY_STATUS(logger_ptr_->on_cells_processed(1));
}
}
if (logger_ptr_) {
Expand Down Expand Up @@ -593,7 +593,7 @@ td::Result<std::size_t> BagOfCells::serialize_to_impl(WriterT& writer, int mode)
}
// std::cerr << std::endl;
if (logger_ptr_) {
TRY_STATUS(logger_ptr_->on_cell_processed());
TRY_STATUS(logger_ptr_->on_cells_processed(1));
}
}
writer.chk();
Expand Down
17 changes: 12 additions & 5 deletions crypto/vm/boc.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,22 +210,27 @@ class BagOfCellsLogger {

void start_stage(std::string stage) {
log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD);
last_speed_log_ = td::Timestamp::now();
processed_cells_ = 0;
timer_ = {};
stage_ = std::move(stage);
}
void finish_stage(td::Slice desc) {
LOG(ERROR) << "serializer: " << stage_ << " took " << timer_.elapsed() << "s, " << desc;
}
td::Status on_cell_processed() {
++processed_cells_;
if (processed_cells_ % 1000 == 0) {
td::Status on_cells_processed(size_t count) {
processed_cells_ += count;
if (processed_cells_ / 1000 > last_token_check_) {
TRY_STATUS(cancellation_token_.check());
last_token_check_ = processed_cells_ / 1000;
}
if (log_speed_at_.is_in_past()) {
log_speed_at_ += LOG_SPEED_PERIOD;
LOG(WARNING) << "serializer: " << stage_ << " " << (double)processed_cells_ / LOG_SPEED_PERIOD << " cells/s";
double period = td::Timestamp::now().at() - last_speed_log_.at();

LOG(WARNING) << "serializer: " << stage_ << " " << (double)processed_cells_ / period << " cells/s";
processed_cells_ = 0;
last_speed_log_ = td::Timestamp::now();
log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD);
}
return td::Status::OK();
}
Expand All @@ -236,6 +241,8 @@ class BagOfCellsLogger {
td::CancellationToken cancellation_token_;
td::Timestamp log_speed_at_;
size_t processed_cells_ = 0;
size_t last_token_check_ = 0;
td::Timestamp last_speed_log_;
static constexpr double LOG_SPEED_PERIOD = 120.0;
};
class BagOfCells {
Expand Down
22 changes: 22 additions & 0 deletions crypto/vm/db/CellStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,28 @@ td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, bool need_da
return res;
}

td::Result<std::vector<CellLoader::LoadResult>> CellLoader::load_bulk(td::Span<td::Slice> hashes, bool need_data,
ExtCellCreator &ext_cell_creator) {
std::vector<std::string> values;
TRY_RESULT(get_statuses, reader_->get_multi(hashes, &values));
std::vector<LoadResult> res;
res.reserve(hashes.size());
for (size_t i = 0; i < hashes.size(); i++) {
auto get_status = get_statuses[i];
if (get_status != KeyValue::GetStatus::Ok) {
DCHECK(get_status == KeyValue::GetStatus::NotFound);
res.push_back(LoadResult{});
continue;
}
TRY_RESULT(load_res, load(hashes[i], values[i], need_data, ext_cell_creator));
if (on_load_callback_) {
on_load_callback_(load_res);
}
res.push_back(std::move(load_res));
}
return res;
}

td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, td::Slice value, bool need_data,
ExtCellCreator &ext_cell_creator) {
LoadResult res;
Expand Down
1 change: 1 addition & 0 deletions crypto/vm/db/CellStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class CellLoader {
};
CellLoader(std::shared_ptr<KeyValueReader> reader, std::function<void(const LoadResult &)> on_load_callback = {});
td::Result<LoadResult> load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator);
td::Result<std::vector<LoadResult>> load_bulk(td::Span<td::Slice> hashes, bool need_data, ExtCellCreator &ext_cell_creator);
static td::Result<LoadResult> load(td::Slice hash, td::Slice value, bool need_data, ExtCellCreator &ext_cell_creator);
td::Result<LoadResult> load_refcnt(td::Slice hash); // This only loads refcnt_, cell_ == null

Expand Down
37 changes: 25 additions & 12 deletions crypto/vm/db/DynamicBagOfCellsDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,15 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
return get_cell_info_lazy(level_mask, hash, depth).cell;
}
td::Result<Ref<DataCell>> load_cell(td::Slice hash) override {
auto info = hash_table_.get_if_exists(hash);
if (info && info->sync_with_db) {
TRY_RESULT(loaded_cell, info->cell->load_cell());
return std::move(loaded_cell.data_cell);
}
TRY_RESULT(res, loader_->load(hash, true, *this));
if (res.status != CellLoader::LoadResult::Ok) {
return td::Status::Error("cell not found");
}
Ref<DataCell> cell = res.cell();
hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_loaded(info, hash, std::move(res)); });
return cell;
TRY_RESULT(loaded_cell, get_cell_info_force(hash).cell->load_cell());
return std::move(loaded_cell.data_cell);
}
td::Result<Ref<DataCell>> load_root(td::Slice hash) override {
return load_cell(hash);
}
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
return td::Status::Error("Not implemented");
}
td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const override {
return td::Status::Error("Not implemented");
}
Expand Down Expand Up @@ -155,6 +148,9 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
promise->set_result(std::move(cell));
});
}
CellInfo &get_cell_info_force(td::Slice hash) {
return hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_force(info, hash); });
}
CellInfo &get_cell_info_lazy(Cell::LevelMask level_mask, td::Slice hash, td::Slice depth) {
return hash_table_.apply(hash.substr(hash.size() - Cell::hash_bytes),
[&](CellInfo &info) { update_cell_info_lazy(info, level_mask, hash, depth); });
Expand Down Expand Up @@ -334,6 +330,23 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
return std::move(load_result.cell());
}

td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
if (db_) {
return db_->load_bulk(hashes);
}
TRY_RESULT(load_result, cell_loader_->load_bulk(hashes, true, *this));

std::vector<Ref<DataCell>> res;
res.reserve(load_result.size());
for (auto &load_res : load_result) {
if (load_res.status != CellLoader::LoadResult::Ok) {
return td::Status::Error("cell not found");
}
res.push_back(std::move(load_res.cell()));
}
return res;
}

private:
static td::NamedThreadSafeCounter::CounterRef get_thread_safe_counter() {
static auto res = td::NamedThreadSafeCounter::get_default().get_counter("DynamicBagOfCellsDbLoader");
Expand Down
2 changes: 2 additions & 0 deletions crypto/vm/db/DynamicBagOfCellsDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ class CellDbReader {
public:
virtual ~CellDbReader() = default;
virtual td::Result<Ref<DataCell>> load_cell(td::Slice hash) = 0;
virtual td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) = 0;
};

class DynamicBagOfCellsDb {
public:
virtual ~DynamicBagOfCellsDb() = default;
virtual td::Result<Ref<DataCell>> load_cell(td::Slice hash) = 0;
virtual td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) = 0;
virtual td::Result<Ref<DataCell>> load_root(td::Slice hash) = 0;
virtual td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const = 0;
struct Stats {
Expand Down
13 changes: 13 additions & 0 deletions crypto/vm/db/InMemoryBagOfCellsDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,16 @@ class CellStorage {
return td::Status::Error("not found");
}

td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<CellHash> hashes) const {
std::vector<Ref<DataCell>> res;
res.reserve(hashes.size());
for (auto &hash : hashes) {
TRY_RESULT(cell, load_cell(hash));
res.push_back(std::move(cell));
}
return res;
}

td::Result<Ref<DataCell>> load_root_local(const CellHash &hash) const {
auto lock = local_access_.lock();
if (auto it = local_roots_.find(hash); it != local_roots_.end()) {
Expand Down Expand Up @@ -769,6 +779,9 @@ class InMemoryBagOfCellsDb : public DynamicBagOfCellsDb {
td::Result<Ref<DataCell>> load_root(td::Slice hash) override {
return storage_->load_root_local(CellHash::from_slice(hash));
}
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
return storage_->load_bulk(td::transform(hashes, [](auto &hash) { return CellHash::from_slice(hash); }));
}
td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const override {
return storage_->load_root_shared(CellHash::from_slice(hash));
}
Expand Down
Loading