Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add state archival getledgerentry endpoint #4623

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ TEST_FILES = $(TESTDATA_DIR)/stellar-core_example.cfg $(TESTDATA_DIR)/stellar-co
$(TESTDATA_DIR)/stellar-core_testnet.cfg $(TESTDATA_DIR)/stellar-core_testnet_legacy.cfg \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.json $(TESTDATA_DIR)/stellar-history.livenet.15686975.json \
$(TESTDATA_DIR)/stellar-core_testnet_validator.cfg $(TESTDATA_DIR)/stellar-core_example_validators.cfg \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.json
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.json \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.v2.json

BUILT_SOURCES = $(SRC_X_FILES:.x=.h) main/StellarCoreVersion.cpp main/XDRFilesSha256.cpp $(TEST_FILES)

Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
}
if (countMergeEvents)
{
bucketManager.incrMergeCounters(mc);
bucketManager.incrMergeCounters<BucketT>(mc);
}

std::vector<Hash> shadowHashes;
Expand Down
182 changes: 121 additions & 61 deletions src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,18 +330,36 @@ BucketManager::getMergeTimer()
return mBucketSnapMerge;
}

template <>
MergeCounters
BucketManager::readMergeCounters<LiveBucket>()
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mLiveMergeCounters;
}

template <>
MergeCounters
BucketManager::readMergeCounters()
BucketManager::readMergeCounters<HotArchiveBucket>()
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mHotArchiveMergeCounters;
}

template <>
void
BucketManager::incrMergeCounters<LiveBucket>(MergeCounters const& delta)
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mMergeCounters;
mLiveMergeCounters += delta;
}

template <>
void
BucketManager::incrMergeCounters(MergeCounters const& delta)
BucketManager::incrMergeCounters<HotArchiveBucket>(MergeCounters const& delta)
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
mMergeCounters += delta;
mHotArchiveMergeCounters += delta;
}

bool
Expand Down Expand Up @@ -623,7 +641,7 @@ BucketManager::getMergeFutureInternal(MergeKey const& key,
auto future = promise.get_future().share();
promise.set_value(bucket);
mc.mFinishedMergeReattachments++;
incrMergeCounters(mc);
incrMergeCounters<BucketT>(mc);
return future;
}
}
Expand All @@ -638,7 +656,7 @@ BucketManager::getMergeFutureInternal(MergeKey const& key,
"BucketManager::getMergeFuture returning running future for merge {}",
key);
mc.mRunningMergeReattachments++;
incrMergeCounters(mc);
incrMergeCounters<BucketT>(mc);
return i->second;
}

Expand Down Expand Up @@ -1013,10 +1031,10 @@ BucketManager::snapshotLedger(LedgerHeader& currentHeader)
currentHeader.ledgerVersion,
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION))
{
// TODO: Hash Archive Bucket
// Dependency: HAS supports Hot Archive BucketList

hash = mLiveBucketList->getHash();
SHA256 hsh;
hsh.add(mLiveBucketList->getHash());
hsh.add(mHotArchiveBucketList->getHash());
hash = hsh.finish();
}
else
{
Expand Down Expand Up @@ -1061,15 +1079,17 @@ BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq,
mSnapshotManager->copySearchableLiveBucketListSnapshot();
auto const& sas = cfg.stateArchivalSettings();

using task_t = std::packaged_task<EvictionResultCandidates()>;
using task_t =
std::packaged_task<std::unique_ptr<EvictionResultCandidates>()>;
// MSVC gotcha: searchableBL has to be shared_ptr because MSVC wants to
// copy this lambda, otherwise we could use unique_ptr.
auto task = std::make_shared<task_t>(
[bl = std::move(searchableBL), iter = cfg.evictionIterator(), ledgerSeq,
ledgerVers, sas, &counters = mBucketListEvictionCounters,
stats = mEvictionStatistics] {
return bl->scanForEviction(ledgerSeq, counters, iter, stats, sas,
ledgerVers);
return std::make_unique<EvictionResultCandidates>(
bl->scanForEviction(ledgerSeq, counters, iter, stats, sas,
ledgerVers));
});

mEvictionFuture = task->get_future();
Expand All @@ -1096,14 +1116,14 @@ BucketManager::resolveBackgroundEvictionScan(

// If eviction related settings changed during the ledger, we have to
// restart the scan
if (!evictionCandidates.isValid(ledgerSeq,
networkConfig.stateArchivalSettings()))
if (!evictionCandidates->isValid(ledgerSeq, ledgerVers,
networkConfig.stateArchivalSettings()))
{
startBackgroundEvictionScan(ledgerSeq, ledgerVers, networkConfig);
evictionCandidates = mEvictionFuture.get();
}

auto& eligibleEntries = evictionCandidates.eligibleEntries;
auto& eligibleEntries = evictionCandidates->eligibleEntries;

for (auto iter = eligibleEntries.begin(); iter != eligibleEntries.end();)
{
Expand All @@ -1121,7 +1141,7 @@ BucketManager::resolveBackgroundEvictionScan(
auto remainingEntriesToEvict =
networkConfig.stateArchivalSettings().maxEntriesToArchive;
auto entryToEvictIter = eligibleEntries.begin();
auto newEvictionIterator = evictionCandidates.endOfRegionIterator;
auto newEvictionIterator = evictionCandidates->endOfRegionIterator;

// Return vectors include both evicted entry and associated TTL
std::vector<LedgerKey> deletedKeys;
Expand Down Expand Up @@ -1161,7 +1181,7 @@ BucketManager::resolveBackgroundEvictionScan(
// region
if (remainingEntriesToEvict != 0)
{
newEvictionIterator = evictionCandidates.endOfRegionIterator;
newEvictionIterator = evictionCandidates->endOfRegionIterator;
}

networkConfig.updateEvictionIterator(ltx, newEvictionIterator);
Expand Down Expand Up @@ -1229,51 +1249,71 @@ BucketManager::assumeState(HistoryArchiveState const& has,
releaseAssert(threadIsMain());
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

// TODO: Assume archival bucket state
// Dependency: HAS supports Hot Archive BucketList
for (uint32_t i = 0; i < LiveBucketList::kNumLevels; ++i)
{
auto curr = getBucketByHashInternal(
hexToBin256(has.currentBuckets.at(i).curr), mSharedLiveBuckets);
auto snap = getBucketByHashInternal(
hexToBin256(has.currentBuckets.at(i).snap), mSharedLiveBuckets);
if (!(curr && snap))
{
throw std::runtime_error("Missing bucket files while assuming "
"saved live BucketList state");
}

auto const& nextFuture = has.currentBuckets.at(i).next;
std::shared_ptr<LiveBucket> nextBucket = nullptr;
if (nextFuture.hasOutputHash())
auto processBucketList = [&](auto& bl, auto const& hasBuckets) {
auto kNumLevels = std::remove_reference<decltype(bl)>::type::kNumLevels;
using BucketT =
typename std::remove_reference<decltype(bl)>::type::bucket_type;
for (uint32_t i = 0; i < kNumLevels; ++i)
{
nextBucket = getBucketByHashInternal(
hexToBin256(nextFuture.getOutputHash()), mSharedLiveBuckets);
if (!nextBucket)
auto curr =
getBucketByHash<BucketT>(hexToBin256(hasBuckets.at(i).curr));
auto snap =
getBucketByHash<BucketT>(hexToBin256(hasBuckets.at(i).snap));
if (!(curr && snap))
{
throw std::runtime_error(
"Missing future bucket files while "
"assuming saved live BucketList state");
throw std::runtime_error("Missing bucket files while assuming "
"saved live BucketList state");
}
}

// Buckets on the BucketList should always be indexed
releaseAssert(curr->isEmpty() || curr->isIndexed());
releaseAssert(snap->isEmpty() || snap->isIndexed());
if (nextBucket)
{
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
auto const& nextFuture = hasBuckets.at(i).next;
std::shared_ptr<BucketT> nextBucket = nullptr;
if (nextFuture.hasOutputHash())
{
nextBucket = getBucketByHash<BucketT>(
hexToBin256(nextFuture.getOutputHash()));
if (!nextBucket)
{
throw std::runtime_error(
"Missing future bucket files while "
"assuming saved live BucketList state");
}
}

// Buckets on the BucketList should always be indexed
releaseAssert(curr->isEmpty() || curr->isIndexed());
releaseAssert(snap->isEmpty() || snap->isIndexed());
if (nextBucket)
{
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
}

bl.getLevel(i).setCurr(curr);
bl.getLevel(i).setSnap(snap);
bl.getLevel(i).setNext(nextFuture);
}
};

mLiveBucketList->getLevel(i).setCurr(curr);
mLiveBucketList->getLevel(i).setSnap(snap);
mLiveBucketList->getLevel(i).setNext(nextFuture);
processBucketList(*mLiveBucketList, has.currentBuckets);
#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
if (has.hasHotArchiveBuckets())
{
processBucketList(*mHotArchiveBucketList, has.hotArchiveBuckets);
}
#endif

if (restartMerges)
{
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
if (has.hasHotArchiveBuckets())
{
mHotArchiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
}
#endif
}
cleanupStaleFiles(has);
}
Expand Down Expand Up @@ -1349,7 +1389,8 @@ BucketManager::loadCompleteLedgerState(HistoryArchiveState const& has)
std::vector<std::pair<Hash, std::string>> hashes;
for (uint32_t i = LiveBucketList::kNumLevels; i > 0; --i)
{
HistoryStateBucket const& hsb = has.currentBuckets.at(i - 1);
HistoryStateBucket<LiveBucket> const& hsb =
has.currentBuckets.at(i - 1);
hashes.emplace_back(hexToBin256(hsb.snap),
fmt::format(FMT_STRING("snap {:d}"), i - 1));
hashes.emplace_back(hexToBin256(hsb.curr),
Expand Down Expand Up @@ -1526,7 +1567,7 @@ BucketManager::visitLedgerEntries(
std::vector<std::pair<Hash, std::string>> hashes;
for (uint32_t i = 0; i < LiveBucketList::kNumLevels; ++i)
{
HistoryStateBucket const& hsb = has.currentBuckets.at(i);
HistoryStateBucket<LiveBucket> const& hsb = has.currentBuckets.at(i);
hashes.emplace_back(hexToBin256(hsb.curr),
fmt::format(FMT_STRING("curr {:d}"), i));
hashes.emplace_back(hexToBin256(hsb.snap),
Expand Down Expand Up @@ -1579,16 +1620,35 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
continue;
}

// TODO: Update verify to for ArchiveBucket
// Dependency: HAS supports Hot Archive BucketList
auto b = getBucketByHashInternal(h, mSharedLiveBuckets);
if (!b)
{
throw std::runtime_error(fmt::format(
FMT_STRING("Missing referenced bucket {}"), binToHex(h)));
}
seq.emplace_back(std::make_shared<VerifyBucketWork>(
mApp, b->getFilename().string(), b->getHash(), nullptr));
auto loadFilenameAndHash = [&]() -> std::pair<std::string, Hash> {
auto live = getBucketByHashInternal(h, mSharedLiveBuckets);
if (!live)
{
auto hot = getBucketByHashInternal(h, mSharedHotArchiveBuckets);

// Check both live and hot archive buckets for hash. If we don't
// find it in either, we're missing a bucket. Note that live and
// hot archive buckets are guaranteed to have no hash collisions
// due to type field in MetaEntry.
if (!hot)
{
throw std::runtime_error(
fmt::format(FMT_STRING("Missing referenced bucket {}"),
binToHex(h)));
}
return std::make_pair(hot->getFilename().string(),
hot->getHash());
}
else
{
return std::make_pair(live->getFilename().string(),
live->getHash());
}
};

auto [filename, hash] = loadFilenameAndHash();
seq.emplace_back(
std::make_shared<VerifyBucketWork>(mApp, filename, hash, nullptr));
}
return mApp.getWorkScheduler().scheduleWork<WorkSequence>(
"verify-referenced-buckets", seq);
Expand Down
9 changes: 5 additions & 4 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,15 @@ class BucketManager : NonMovableOrCopyable
medida::Counter& mLiveBucketListSizeCounter;
medida::Counter& mArchiveBucketListSizeCounter;
EvictionCounters mBucketListEvictionCounters;
MergeCounters mMergeCounters;
MergeCounters mLiveMergeCounters;
MergeCounters mHotArchiveMergeCounters;
std::shared_ptr<EvictionStatistics> mEvictionStatistics{};
std::map<LedgerEntryTypeAndDurability, medida::Counter&>
mBucketListEntryCountCounters;
std::map<LedgerEntryTypeAndDurability, medida::Counter&>
mBucketListEntrySizeCounters;

std::future<EvictionResultCandidates> mEvictionFuture{};
std::future<std::unique_ptr<EvictionResultCandidates>> mEvictionFuture{};

// Copy app's config for thread-safe access
Config const mConfig;
Expand Down Expand Up @@ -203,8 +204,8 @@ class BucketManager : NonMovableOrCopyable

// Reading and writing the merge counters is done in bulk, and takes a lock
// briefly; this can be done from any thread.
MergeCounters readMergeCounters();
void incrMergeCounters(MergeCounters const& delta);
template <class BucketT> MergeCounters readMergeCounters();
template <class BucketT> void incrMergeCounters(MergeCounters const& delta);

// Get a reference to a persistent bucket (in the BucketManager's bucket
// directory), from the BucketManager's shared bucket-set.
Expand Down
Loading
Loading