Skip to content

Commit

Permalink
Added state archival getledgerentry http endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Jan 22, 2025
1 parent e258f51 commit e700d1c
Show file tree
Hide file tree
Showing 5 changed files with 525 additions and 13 deletions.
40 changes: 34 additions & 6 deletions src/bucket/BucketSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ BucketSnapshotManager::recordBulkLoadMetrics(std::string const& label,
return iter->second;
}

namespace
{
template <typename T, typename U>
bool
needsUpdate(std::shared_ptr<T const> const& snapshot,
SnapshotPtrT<U> const& curr)
{
return !snapshot || snapshot->getLedgerSeq() < curr->getLedgerSeq();
}
}

void
BucketSnapshotManager::maybeCopySearchableBucketListSnapshot(
SearchableSnapshotConstPtr& snapshot)
Expand All @@ -123,9 +134,7 @@ BucketSnapshotManager::maybeCopySearchableBucketListSnapshot(
// modified. Rather, a thread is checking it's copy against the canonical
// snapshot, so use a shared lock.
std::shared_lock<std::shared_mutex> lock(mSnapshotMutex);

if (!snapshot ||
snapshot->getLedgerSeq() < mCurrLiveSnapshot->getLedgerSeq())
if (needsUpdate(snapshot, mCurrLiveSnapshot))
{
snapshot = copySearchableLiveBucketListSnapshot();
}
Expand All @@ -139,14 +148,33 @@ BucketSnapshotManager::maybeCopySearchableHotArchiveBucketListSnapshot(
// modified. Rather, a thread is checking it's copy against the canonical
// snapshot, so use a shared lock.
std::shared_lock<std::shared_mutex> lock(mSnapshotMutex);

if (!snapshot ||
snapshot->getLedgerSeq() < mCurrHotArchiveSnapshot->getLedgerSeq())
if (needsUpdate(snapshot, mCurrHotArchiveSnapshot))
{
snapshot = copySearchableHotArchiveBucketListSnapshot();
}
}

void
BucketSnapshotManager::maybeCopyLiveAndHotArchiveSnapshots(
SearchableSnapshotConstPtr& liveSnapshot,
SearchableHotArchiveSnapshotConstPtr& hotArchiveSnapshot)
{
// The canonical snapshot held by the BucketSnapshotManager is not being
// modified. Rather, a thread is checking it's copy against the canonical
// snapshot, so use a shared lock. For consistency we hold the lock while
// updating both snapshots.
std::shared_lock<std::shared_mutex> lock(mSnapshotMutex);
if (needsUpdate(liveSnapshot, mCurrLiveSnapshot))
{
liveSnapshot = copySearchableLiveBucketListSnapshot();
}

if (needsUpdate(hotArchiveSnapshot, mCurrHotArchiveSnapshot))
{
hotArchiveSnapshot = copySearchableHotArchiveBucketListSnapshot();
}
}

void
BucketSnapshotManager::updateCurrentSnapshot(
SnapshotPtrT<LiveBucket>&& liveSnapshot,
Expand Down
7 changes: 7 additions & 0 deletions src/bucket/BucketSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ class BucketSnapshotManager : NonMovableOrCopyable
void maybeCopySearchableHotArchiveBucketListSnapshot(
SearchableHotArchiveSnapshotConstPtr& snapshot);

// This function is the same as snapshot refreshers above, but guarantees
// that both snapshots are consistent with the same lcl. This is required
// when querying both snapshot types as part of the same query.
void maybeCopyLiveAndHotArchiveSnapshots(
SearchableSnapshotConstPtr& liveSnapshot,
SearchableHotArchiveSnapshotConstPtr& hotArchiveSnapshot);

// All metric recording functions must only be called by the main thread
void startPointLoadTimer() const;
void endPointLoadTimer(LedgerEntryType t, bool bloomMiss) const;
Expand Down
225 changes: 220 additions & 5 deletions src/main/QueryServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
#include "bucket/BucketSnapshotManager.h"
#include "bucket/SearchableBucketList.h"
#include "ledger/LedgerTxnImpl.h"
#include "ledger/LedgerTypeUtils.h"
#include "util/Logging.h"
#include "util/XDRStream.h" // IWYU pragma: keep
#include "util/types.h"
#include <exception>
#include <json/json.h>

Expand Down Expand Up @@ -54,7 +56,12 @@ namespace stellar
{
QueryServer::QueryServer(const std::string& address, unsigned short port,
int maxClient, size_t threadPoolSize,
BucketSnapshotManager& bucketSnapshotManager)
BucketSnapshotManager& bucketSnapshotManager
#ifdef BUILD_TESTS
,
bool useMainThreadForTesting
#endif
)
: mServer(address, port, maxClient, threadPoolSize)
, mBucketSnapshotManager(bucketSnapshotManager)
{
Expand All @@ -63,12 +70,28 @@ QueryServer::QueryServer(const std::string& address, unsigned short port,

mServer.add404(std::bind(&QueryServer::notFound, this, _1, _2, _3));
addRoute("getledgerentryraw", &QueryServer::getLedgerEntryRaw);
addRoute("getledgerentry", &QueryServer::getLedgerEntry);

auto workerPids = mServer.start();
for (auto pid : workerPids)
#ifdef BUILD_TESTS
if (useMainThreadForTesting)
{
mBucketListSnapshots[pid] = std::move(
bucketSnapshotManager.copySearchableLiveBucketListSnapshot());
mBucketListSnapshots[std::this_thread::get_id()] =
bucketSnapshotManager.copySearchableLiveBucketListSnapshot();
mHotArchiveBucketListSnapshots[std::this_thread::get_id()] =
bucketSnapshotManager.copySearchableHotArchiveBucketListSnapshot();
}
else
#endif
{
auto workerPids = mServer.start();
for (auto pid : workerPids)
{
mBucketListSnapshots[pid] =
bucketSnapshotManager.copySearchableLiveBucketListSnapshot();
mHotArchiveBucketListSnapshots[pid] =
bucketSnapshotManager
.copySearchableHotArchiveBucketListSnapshot();
}
}
}

Expand Down Expand Up @@ -190,4 +213,196 @@ QueryServer::getLedgerEntryRaw(std::string const& params,
retStr = Json::FastWriter().write(root);
return true;
}

// This query needs to load all the given ledger entries and their "state"
// (live, archived, evicted, new). This requires a loading entry and TTL from
// the live BucketList and then checking the Hot Archive for any keys we didn't
// find. We do three passes:
// 1. Load all keys from the live BucketList
// 2. For any Soroban keys not in the live BucketList, load them from the Hot
// Archive
// 3. Load TTL keys for any live Soroban entries found in 1.
bool
QueryServer::getLedgerEntry(std::string const& params, std::string const& body,
std::string& retStr)
{
ZoneScoped;
Json::Value root;

std::map<std::string, std::vector<std::string>> paramMap;
httpThreaded::server::server::parsePostParams(body, paramMap);

auto keys = paramMap["key"];
auto snapshotLedger = parseOptionalParam<uint32_t>(paramMap, "ledgerSeq");

if (keys.empty())
{
throw std::invalid_argument(
"Must specify ledger key in POST body: key=<LedgerKey in base64 "
"XDR format>");
}

// Get snapshots for both live and hot archive bucket lists
auto& liveBl = mBucketListSnapshots.at(std::this_thread::get_id());
auto& hotArchiveBl =
mHotArchiveBucketListSnapshots.at(std::this_thread::get_id());

// orderedNotFoundKeys is a set of keys we have not yet found (not in live
// BucketList or in an archived state in the Hot Archive)
LedgerKeySet orderedNotFoundKeys;
for (auto const& key : keys)
{
LedgerKey k;
fromOpaqueBase64(k, key);

// Check for TTL keys which are not allowed
if (k.type() == TTL)
{
retStr = "TTL keys are not allowed";
return false;
}

orderedNotFoundKeys.emplace(k);
}

mBucketSnapshotManager.maybeCopyLiveAndHotArchiveSnapshots(liveBl,
hotArchiveBl);

std::vector<LedgerEntry> liveEntries;
std::vector<HotArchiveBucketEntry> archivedEntries;
uint32_t ledgerSeq =
snapshotLedger ? *snapshotLedger : liveBl->getLedgerSeq();
root["ledgerSeq"] = ledgerSeq;

auto liveEntriesOp =
liveBl->loadKeysFromLedger(orderedNotFoundKeys, ledgerSeq);

// Return 404 if ledgerSeq not found
if (!liveEntriesOp)
{
retStr = "LedgerSeq not found";
return false;
}

liveEntries = std::move(*liveEntriesOp);

// Remove keys found in live bucketList
for (auto const& le : liveEntries)
{
orderedNotFoundKeys.erase(LedgerEntryKey(le));
}

LedgerKeySet hotArchiveKeysToSearch;
for (auto const& lk : orderedNotFoundKeys)
{
if (isSorobanEntry(lk))
{
hotArchiveKeysToSearch.emplace(lk);
}
}

// Only query archive for remaining keys
if (!hotArchiveKeysToSearch.empty())
{
auto archivedEntriesOp =
hotArchiveBl->loadKeysFromLedger(hotArchiveKeysToSearch, ledgerSeq);
if (!archivedEntriesOp)
{
retStr = "LedgerSeq not found";
return false;
}
archivedEntries = std::move(*archivedEntriesOp);
}

// Collect TTL keys for Soroban entries in the live BucketList
LedgerKeySet ttlKeys;
for (auto const& le : liveEntries)
{
if (isSorobanEntry(le.data))
{
ttlKeys.emplace(getTTLKey(le));
}
}

std::vector<LedgerEntry> ttlEntries;
if (!ttlKeys.empty())
{
// We haven't updated the live snapshot so we will never not have the
// requested ledgerSeq and return nullopt.
ttlEntries =
std::move(liveBl->loadKeysFromLedger(ttlKeys, ledgerSeq).value());
}

std::unordered_map<LedgerKey, LedgerEntry> ttlMap;
for (auto const& ttlEntry : ttlEntries)
{
ttlMap.emplace(LedgerEntryKey(ttlEntry), ttlEntry);
}

// Process live entries
for (auto const& le : liveEntries)
{
Json::Value entry;
entry["e"] = toOpaqueBase64(le);

// Check TTL for Soroban entries
if (isSorobanEntry(le.data))
{
auto ttlIter = ttlMap.find(getTTLKey(le));
releaseAssertOrThrow(ttlIter != ttlMap.end());
if (isLive(ttlIter->second, ledgerSeq))
{
entry["state"] = "live";
entry["ttl"] = ttlIter->second.data.ttl().liveUntilLedgerSeq;
}
else
{
entry["state"] = "archived";
}
}
else
{
entry["state"] = "live";
}

root["entries"].append(entry);
}

// Process archived entries - all are evicted since they come from hot
// archive
for (auto const& be : archivedEntries)
{
// If we get to this point, we know the key is not in the live
// BucketList, so if we get a DELETED or RESTORED entry, the entry is
// new wrt ledger state.
if (be.type() != HOT_ARCHIVE_ARCHIVED)
{
continue;
}

auto const& le = be.archivedEntry();

// At this point we've "found" the key and know it's archived, so remove
// it from our search set
orderedNotFoundKeys.erase(LedgerEntryKey(le));

Json::Value entry;
entry["e"] = toOpaqueBase64(le);
entry["state"] = "evicted";
root["entries"].append(entry);
}

// Since we removed entries found in the live BucketList and archived
// entries found in the Hot Archive, any remaining keys must be new.
for (auto const& key : orderedNotFoundKeys)
{
Json::Value entry;
entry["e"] = toOpaqueBase64(key);
entry["state"] = "new";
root["entries"].append(entry);
}

retStr = Json::FastWriter().write(root);
return true;
}
}
18 changes: 16 additions & 2 deletions src/main/QueryServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class QueryServer
std::unordered_map<std::thread::id, SearchableSnapshotConstPtr>
mBucketListSnapshots;

std::unordered_map<std::thread::id, SearchableHotArchiveSnapshotConstPtr>
mHotArchiveBucketListSnapshots;

BucketSnapshotManager& mBucketSnapshotManager;

bool safeRouter(HandlerRoute route, std::string const& params,
Expand All @@ -39,14 +42,25 @@ class QueryServer

void addRoute(std::string const& name, HandlerRoute route);

#ifdef BUILD_TESTS
public:
#endif
// Returns raw LedgerKeys for the given keys from the Live BucketList. Does
// not query other BucketLists or reason about archival.
bool getLedgerEntryRaw(std::string const& params, std::string const& body,
std::string& retStr);

bool getLedgerEntry(std::string const& params, std::string const& body,
std::string& retStr);

public:
QueryServer(const std::string& address, unsigned short port, int maxClient,
size_t threadPoolSize,
BucketSnapshotManager& bucketSnapshotManager);
BucketSnapshotManager& bucketSnapshotManager
#ifdef BUILD_TESTS
,
bool useMainThreadForTesting = false
#endif
);
};
}
}
Loading

0 comments on commit e700d1c

Please sign in to comment.