diff --git a/src/bucket/BucketSnapshotManager.cpp b/src/bucket/BucketSnapshotManager.cpp index aaa85a3e44..8eb7345b08 100644 --- a/src/bucket/BucketSnapshotManager.cpp +++ b/src/bucket/BucketSnapshotManager.cpp @@ -115,6 +115,17 @@ BucketSnapshotManager::recordBulkLoadMetrics(std::string const& label, return iter->second; } +namespace +{ +template +bool +needsUpdate(std::shared_ptr const& snapshot, + SnapshotPtrT const& curr) +{ + return !snapshot || snapshot->getLedgerSeq() < curr->getLedgerSeq(); +} +} + void BucketSnapshotManager::maybeCopySearchableBucketListSnapshot( SearchableSnapshotConstPtr& snapshot) @@ -123,9 +134,7 @@ BucketSnapshotManager::maybeCopySearchableBucketListSnapshot( // modified. Rather, a thread is checking it's copy against the canonical // snapshot, so use a shared lock. std::shared_lock lock(mSnapshotMutex); - - if (!snapshot || - snapshot->getLedgerSeq() < mCurrLiveSnapshot->getLedgerSeq()) + if (needsUpdate(snapshot, mCurrLiveSnapshot)) { snapshot = copySearchableLiveBucketListSnapshot(); } @@ -139,14 +148,33 @@ BucketSnapshotManager::maybeCopySearchableHotArchiveBucketListSnapshot( // modified. Rather, a thread is checking it's copy against the canonical // snapshot, so use a shared lock. std::shared_lock lock(mSnapshotMutex); - - if (!snapshot || - snapshot->getLedgerSeq() < mCurrHotArchiveSnapshot->getLedgerSeq()) + if (needsUpdate(snapshot, mCurrHotArchiveSnapshot)) { snapshot = copySearchableHotArchiveBucketListSnapshot(); } } +void +BucketSnapshotManager::maybeCopyLiveAndHotArchiveSnapshots( + SearchableSnapshotConstPtr& liveSnapshot, + SearchableHotArchiveSnapshotConstPtr& hotArchiveSnapshot) +{ + // The canonical snapshot held by the BucketSnapshotManager is not being + // modified. Rather, a thread is checking it's copy against the canonical + // snapshot, so use a shared lock. For consistency we hold the lock while + // updating both snapshots. + std::shared_lock lock(mSnapshotMutex); + if (needsUpdate(liveSnapshot, mCurrLiveSnapshot)) + { + liveSnapshot = copySearchableLiveBucketListSnapshot(); + } + + if (needsUpdate(hotArchiveSnapshot, mCurrHotArchiveSnapshot)) + { + hotArchiveSnapshot = copySearchableHotArchiveBucketListSnapshot(); + } +} + void BucketSnapshotManager::updateCurrentSnapshot( SnapshotPtrT&& liveSnapshot, diff --git a/src/bucket/BucketSnapshotManager.h b/src/bucket/BucketSnapshotManager.h index 948a7c0ee0..ddedc01d21 100644 --- a/src/bucket/BucketSnapshotManager.h +++ b/src/bucket/BucketSnapshotManager.h @@ -99,6 +99,13 @@ class BucketSnapshotManager : NonMovableOrCopyable void maybeCopySearchableHotArchiveBucketListSnapshot( SearchableHotArchiveSnapshotConstPtr& snapshot); + // This function is the same as snapshot refreshers above, but guarantees + // that both snapshots are consistent with the same lcl. This is required + // when querying both snapshot types as part of the same query. + void maybeCopyLiveAndHotArchiveSnapshots( + SearchableSnapshotConstPtr& liveSnapshot, + SearchableHotArchiveSnapshotConstPtr& hotArchiveSnapshot); + // All metric recording functions must only be called by the main thread void startPointLoadTimer() const; void endPointLoadTimer(LedgerEntryType t, bool bloomMiss) const; diff --git a/src/main/QueryServer.cpp b/src/main/QueryServer.cpp index 13feb0a77b..0a246826aa 100644 --- a/src/main/QueryServer.cpp +++ b/src/main/QueryServer.cpp @@ -6,8 +6,10 @@ #include "bucket/BucketSnapshotManager.h" #include "bucket/SearchableBucketList.h" #include "ledger/LedgerTxnImpl.h" +#include "ledger/LedgerTypeUtils.h" #include "util/Logging.h" #include "util/XDRStream.h" // IWYU pragma: keep +#include "util/types.h" #include #include @@ -54,7 +56,12 @@ namespace stellar { QueryServer::QueryServer(const std::string& address, unsigned short port, int maxClient, size_t threadPoolSize, - BucketSnapshotManager& bucketSnapshotManager) + BucketSnapshotManager& bucketSnapshotManager +#ifdef BUILD_TESTS + , + bool useMainThreadForTesting +#endif + ) : mServer(address, port, maxClient, threadPoolSize) , mBucketSnapshotManager(bucketSnapshotManager) { @@ -63,12 +70,28 @@ QueryServer::QueryServer(const std::string& address, unsigned short port, mServer.add404(std::bind(&QueryServer::notFound, this, _1, _2, _3)); addRoute("getledgerentryraw", &QueryServer::getLedgerEntryRaw); + addRoute("getledgerentry", &QueryServer::getLedgerEntry); - auto workerPids = mServer.start(); - for (auto pid : workerPids) +#ifdef BUILD_TESTS + if (useMainThreadForTesting) { - mBucketListSnapshots[pid] = std::move( - bucketSnapshotManager.copySearchableLiveBucketListSnapshot()); + mBucketListSnapshots[std::this_thread::get_id()] = + bucketSnapshotManager.copySearchableLiveBucketListSnapshot(); + mHotArchiveBucketListSnapshots[std::this_thread::get_id()] = + bucketSnapshotManager.copySearchableHotArchiveBucketListSnapshot(); + } + else +#endif + { + auto workerPids = mServer.start(); + for (auto pid : workerPids) + { + mBucketListSnapshots[pid] = + bucketSnapshotManager.copySearchableLiveBucketListSnapshot(); + mHotArchiveBucketListSnapshots[pid] = + bucketSnapshotManager + .copySearchableHotArchiveBucketListSnapshot(); + } } } @@ -190,4 +213,196 @@ QueryServer::getLedgerEntryRaw(std::string const& params, retStr = Json::FastWriter().write(root); return true; } + +// This query needs to load all the given ledger entries and their "state" +// (live, archived, evicted, new). This requires a loading entry and TTL from +// the live BucketList and then checking the Hot Archive for any keys we didn't +// find. We do three passes: +// 1. Load all keys from the live BucketList +// 2. For any Soroban keys not in the live BucketList, load them from the Hot +// Archive +// 3. Load TTL keys for any live Soroban entries found in 1. +bool +QueryServer::getLedgerEntry(std::string const& params, std::string const& body, + std::string& retStr) +{ + ZoneScoped; + Json::Value root; + + std::map> paramMap; + httpThreaded::server::server::parsePostParams(body, paramMap); + + auto keys = paramMap["key"]; + auto snapshotLedger = parseOptionalParam(paramMap, "ledgerSeq"); + + if (keys.empty()) + { + throw std::invalid_argument( + "Must specify ledger key in POST body: key="); + } + + // Get snapshots for both live and hot archive bucket lists + auto& liveBl = mBucketListSnapshots.at(std::this_thread::get_id()); + auto& hotArchiveBl = + mHotArchiveBucketListSnapshots.at(std::this_thread::get_id()); + + // orderedNotFoundKeys is a set of keys we have not yet found (not in live + // BucketList or in an archived state in the Hot Archive) + LedgerKeySet orderedNotFoundKeys; + for (auto const& key : keys) + { + LedgerKey k; + fromOpaqueBase64(k, key); + + // Check for TTL keys which are not allowed + if (k.type() == TTL) + { + retStr = "TTL keys are not allowed"; + return false; + } + + orderedNotFoundKeys.emplace(k); + } + + mBucketSnapshotManager.maybeCopyLiveAndHotArchiveSnapshots(liveBl, + hotArchiveBl); + + std::vector liveEntries; + std::vector archivedEntries; + uint32_t ledgerSeq = + snapshotLedger ? *snapshotLedger : liveBl->getLedgerSeq(); + root["ledgerSeq"] = ledgerSeq; + + auto liveEntriesOp = + liveBl->loadKeysFromLedger(orderedNotFoundKeys, ledgerSeq); + + // Return 404 if ledgerSeq not found + if (!liveEntriesOp) + { + retStr = "LedgerSeq not found"; + return false; + } + + liveEntries = std::move(*liveEntriesOp); + + // Remove keys found in live bucketList + for (auto const& le : liveEntries) + { + orderedNotFoundKeys.erase(LedgerEntryKey(le)); + } + + LedgerKeySet hotArchiveKeysToSearch; + for (auto const& lk : orderedNotFoundKeys) + { + if (isSorobanEntry(lk)) + { + hotArchiveKeysToSearch.emplace(lk); + } + } + + // Only query archive for remaining keys + if (!hotArchiveKeysToSearch.empty()) + { + auto archivedEntriesOp = + hotArchiveBl->loadKeysFromLedger(hotArchiveKeysToSearch, ledgerSeq); + if (!archivedEntriesOp) + { + retStr = "LedgerSeq not found"; + return false; + } + archivedEntries = std::move(*archivedEntriesOp); + } + + // Collect TTL keys for Soroban entries in the live BucketList + LedgerKeySet ttlKeys; + for (auto const& le : liveEntries) + { + if (isSorobanEntry(le.data)) + { + ttlKeys.emplace(getTTLKey(le)); + } + } + + std::vector ttlEntries; + if (!ttlKeys.empty()) + { + // We haven't updated the live snapshot so we will never not have the + // requested ledgerSeq and return nullopt. + ttlEntries = + std::move(liveBl->loadKeysFromLedger(ttlKeys, ledgerSeq).value()); + } + + std::unordered_map ttlMap; + for (auto const& ttlEntry : ttlEntries) + { + ttlMap.emplace(LedgerEntryKey(ttlEntry), ttlEntry); + } + + // Process live entries + for (auto const& le : liveEntries) + { + Json::Value entry; + entry["e"] = toOpaqueBase64(le); + + // Check TTL for Soroban entries + if (isSorobanEntry(le.data)) + { + auto ttlIter = ttlMap.find(getTTLKey(le)); + releaseAssertOrThrow(ttlIter != ttlMap.end()); + if (isLive(ttlIter->second, ledgerSeq)) + { + entry["state"] = "live"; + entry["ttl"] = ttlIter->second.data.ttl().liveUntilLedgerSeq; + } + else + { + entry["state"] = "archived"; + } + } + else + { + entry["state"] = "live"; + } + + root["entries"].append(entry); + } + + // Process archived entries - all are evicted since they come from hot + // archive + for (auto const& be : archivedEntries) + { + // If we get to this point, we know the key is not in the live + // BucketList, so if we get a DELETED or RESTORED entry, the entry is + // new wrt ledger state. + if (be.type() != HOT_ARCHIVE_ARCHIVED) + { + continue; + } + + auto const& le = be.archivedEntry(); + + // At this point we've "found" the key and know it's archived, so remove + // it from our search set + orderedNotFoundKeys.erase(LedgerEntryKey(le)); + + Json::Value entry; + entry["e"] = toOpaqueBase64(le); + entry["state"] = "evicted"; + root["entries"].append(entry); + } + + // Since we removed entries found in the live BucketList and archived + // entries found in the Hot Archive, any remaining keys must be new. + for (auto const& key : orderedNotFoundKeys) + { + Json::Value entry; + entry["e"] = toOpaqueBase64(key); + entry["state"] = "new"; + root["entries"].append(entry); + } + + retStr = Json::FastWriter().write(root); + return true; +} } \ No newline at end of file diff --git a/src/main/QueryServer.h b/src/main/QueryServer.h index 10d88f6401..b45ba81713 100644 --- a/src/main/QueryServer.h +++ b/src/main/QueryServer.h @@ -29,6 +29,9 @@ class QueryServer std::unordered_map mBucketListSnapshots; + std::unordered_map + mHotArchiveBucketListSnapshots; + BucketSnapshotManager& mBucketSnapshotManager; bool safeRouter(HandlerRoute route, std::string const& params, @@ -39,14 +42,25 @@ class QueryServer void addRoute(std::string const& name, HandlerRoute route); +#ifdef BUILD_TESTS + public: +#endif // Returns raw LedgerKeys for the given keys from the Live BucketList. Does // not query other BucketLists or reason about archival. bool getLedgerEntryRaw(std::string const& params, std::string const& body, std::string& retStr); + bool getLedgerEntry(std::string const& params, std::string const& body, + std::string& retStr); + public: QueryServer(const std::string& address, unsigned short port, int maxClient, size_t threadPoolSize, - BucketSnapshotManager& bucketSnapshotManager); + BucketSnapshotManager& bucketSnapshotManager +#ifdef BUILD_TESTS + , + bool useMainThreadForTesting = false +#endif + ); }; -} \ No newline at end of file +} diff --git a/src/main/test/QueryServerTests.cpp b/src/main/test/QueryServerTests.cpp new file mode 100644 index 0000000000..ef5c9c1af3 --- /dev/null +++ b/src/main/test/QueryServerTests.cpp @@ -0,0 +1,248 @@ +// Copyright 2025 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "bucket/BucketManager.h" +#include "bucket/test/BucketTestUtils.h" +#include "ledger/LedgerTxnImpl.h" +#include "ledger/LedgerTypeUtils.h" +#include "ledger/test/LedgerTestUtils.h" +#include "lib/catch.hpp" +#include "main/Application.h" +#include "main/Config.h" +#include "main/QueryServer.h" +#include "test/test.h" +#include "util/Math.h" +#include + +using namespace stellar; + +// TODO: Better testing of errors, edge cases like an entry being in both live +// and archive bl, etc. +TEST_CASE("getledgerentry", "[queryserver]") +{ + VirtualClock clock; + auto cfg = getTestConfig(); + cfg.QUERY_SNAPSHOT_LEDGERS = 5; + + auto app = createTestApplication( + clock, cfg); + auto& lm = app->getLedgerManager(); + + // Query Server is disabled by default in cfg. Instead of enabling it, we're + // going to manage a versio here manually so we can directly call functions + // and avoid sending network requests. + auto qServer = std::make_unique( + "127.0.0.1", // Address + 0, // port (0 = random) + 1, // maxClient + 1, // threadPoolSize + app->getBucketManager().getBucketSnapshotManager(), true); + + std::unordered_map liveEntryMap; + + // Map code/data lk -> ttl value + std::unordered_map liveTTLEntryMap; + std::unordered_map archivedEntryMap; + std::unordered_map evictedEntryMap; + + // Create some test entries + for (auto i = 0; i < 15; ++i) + { + auto lcl = app->getLedgerManager().getLastClosedLedgerNum(); + auto liveEntries = + LedgerTestUtils::generateValidUniqueLedgerEntriesWithTypes( + {CONTRACT_DATA, CONTRACT_CODE, ACCOUNT}, 5); + + std::vector liveEntriesToInsert; + for (auto const& le : liveEntries) + { + if (isSorobanEntry(le.data)) + { + LedgerEntry ttl; + ttl.data.type(TTL); + ttl.data.ttl().keyHash = getTTLKey(le).ttl().keyHash; + + // Make half of the entries archived on the live BL + if (rand_flip()) + { + ttl.data.ttl().liveUntilLedgerSeq = lcl + 100; + } + else + { + ttl.data.ttl().liveUntilLedgerSeq = 0; + } + liveTTLEntryMap[LedgerEntryKey(le)] = + ttl.data.ttl().liveUntilLedgerSeq; + liveEntriesToInsert.push_back(ttl); + } + + liveEntriesToInsert.push_back(le); + liveEntryMap[LedgerEntryKey(le)] = le; + } + + auto archivedEntries = + LedgerTestUtils::generateValidUniqueLedgerEntriesWithTypes( + {CONTRACT_DATA, CONTRACT_CODE}, 5); + for (auto const& le : archivedEntries) + { + archivedEntryMap[LedgerEntryKey(le)] = le; + } + + lm.setNextLedgerEntryBatchForBucketTesting({}, liveEntriesToInsert, {}); + lm.setNextArchiveBatchForBucketTesting(archivedEntries, {}, {}); + closeLedger(*app); + } + + // Lambda to build request body + auto buildRequestBody = + [](std::optional ledgerSeq, + std::vector const& keys) -> std::string { + std::string body; + if (ledgerSeq) + { + body = "ledgerSeq=" + std::to_string(*ledgerSeq); + } + + for (auto const& key : keys) + { + body += (body.empty() ? "" : "&") + std::string("key=") + + toOpaqueBase64(key); + } + return body; + }; + + // Lambda to check entry details in response + auto checkEntry = [](std::string const& retStr, LedgerEntry const& le, + std::optional expectedTTL, + uint32_t ledgerSeq) -> bool { + Json::Value root; + Json::Reader reader; + REQUIRE(reader.parse(retStr, root)); + REQUIRE(root.isMember("entries")); + REQUIRE(root.isMember("ledgerSeq")); + REQUIRE(root["ledgerSeq"].asUInt() == ledgerSeq); + + auto const& entries = root["entries"]; + for (auto const& entry : entries) + { + REQUIRE(entry.isMember("e")); + REQUIRE(entry.isMember("state")); + + LedgerEntry responseLE; + fromOpaqueBase64(responseLE, entry["e"].asString()); + if (responseLE == le) + { + std::string expectedState; + if (!isSorobanEntry(le.data)) + { + expectedState = "live"; + } + else + { + if (expectedTTL) + { + if (ledgerSeq >= *expectedTTL) + { + expectedState = "archived"; + } + else + { + expectedState = "live"; + } + } + else + { + expectedState = "evicted"; + } + } + + REQUIRE(entry["state"].asString() == expectedState); + if (isSorobanEntry(le.data) && expectedState == "live") + { + REQUIRE(entry.isMember("ttl")); + REQUIRE(entry["ttl"].asUInt() == *expectedTTL); + } + else + { + REQUIRE(!entry.isMember("ttl")); + } + + return true; + } + } + return false; + }; + + // Lambda to check new entry response + auto checkNewEntry = [](std::string const& retStr, LedgerKey const& key, + uint32_t ledgerSeq) -> bool { + Json::Value root; + Json::Reader reader; + REQUIRE(reader.parse(retStr, root)); + REQUIRE(root.isMember("entries")); + REQUIRE(root.isMember("ledgerSeq")); + REQUIRE(root["ledgerSeq"].asUInt() == ledgerSeq); + + auto const& entries = root["entries"]; + for (auto const& entry : entries) + { + REQUIRE(entry.isMember("e")); + REQUIRE(entry.isMember("state")); + REQUIRE(entry["state"].asString() == "new"); + + LedgerKey responseKey; + fromOpaqueBase64(responseKey, entry["e"].asString()); + if (responseKey == key) + { + REQUIRE(!entry.isMember("ttl")); + return true; + } + } + return false; + }; + + UnorderedSet seenKeys; + for (auto const& [lk, le] : liveEntryMap) + { + auto body = buildRequestBody(std::nullopt, {lk}); + std::string retStr; + std::string empty; + REQUIRE(qServer->getLedgerEntry(empty, body, retStr)); + + auto ttlIter = liveTTLEntryMap.find(lk); + std::optional expectedTTL = + ttlIter != liveTTLEntryMap.end() + ? std::make_optional(ttlIter->second) + : std::nullopt; + REQUIRE( + checkEntry(retStr, le, expectedTTL, lm.getLastClosedLedgerNum())); + + // Remove any duplicates we've already found + archivedEntryMap.erase(lk); + seenKeys.insert(lk); + } + + for (auto const& [lk, le] : archivedEntryMap) + { + auto body = buildRequestBody(std::nullopt, {lk}); + std::string retStr; + std::string empty; + REQUIRE(qServer->getLedgerEntry(empty, body, retStr)); + REQUIRE( + checkEntry(retStr, le, std::nullopt, lm.getLastClosedLedgerNum())); + seenKeys.insert(lk); + } + + // Now check for new entries + auto newKeys = LedgerTestUtils::generateValidUniqueLedgerKeysWithTypes( + {TRUSTLINE, CONTRACT_DATA, CONTRACT_CODE}, 5, seenKeys); + for (auto const& key : newKeys) + { + auto body = buildRequestBody(std::nullopt, {key}); + std::string retStr; + std::string empty; + REQUIRE(qServer->getLedgerEntry(empty, body, retStr)); + REQUIRE(checkNewEntry(retStr, key, lm.getLastClosedLedgerNum())); + } +}