Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Background tryAdd functionality in TransactionQueue #4617

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
94 changes: 63 additions & 31 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ HerderImpl::SCPMetrics::SCPMetrics(Application& app)
}

HerderImpl::HerderImpl(Application& app)
: mTransactionQueue(app, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
TRANSACTION_QUEUE_SIZE_MULTIPLIER)
, mPendingEnvelopes(app, *this)
: mPendingEnvelopes(app, *this)
, mHerderSCPDriver(app, *this, mUpgrades, mPendingEnvelopes)
, mLastSlotSaved(0)
, mTrackingTimer(app)
Expand Down Expand Up @@ -275,7 +272,10 @@ HerderImpl::shutdown()
"Shutdown interrupting quorum transitive closure analysis.");
mLastQuorumMapIntersectionState.mInterruptFlag = true;
}
mTransactionQueue.shutdown();
if (mTransactionQueue)
{
mTransactionQueue->shutdown();
}
if (mSorobanTransactionQueue)
{
mSorobanTransactionQueue->shutdown();
Expand Down Expand Up @@ -603,7 +603,7 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
mSorobanTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
!tx->isSoroban();
bool hasClassic =
mTransactionQueue.sourceAccountPending(tx->getSourceID()) &&
mTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
tx->isSoroban();
if (hasSoroban || hasClassic)
{
Expand All @@ -617,11 +617,31 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
}
else if (!tx->isSoroban())
{
result = mTransactionQueue.tryAdd(tx, submittedFromSelf);
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
{
mApp.postOnOverlayThread(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping we could move the whole flow Peer::recvTransaction to the background to avoid going to the main thread. Right now the flow is:

  • Transactions is read on the overlay thread
  • Transaction is sent to an appropriate queue on the main thread
  • Adding to the tx queue is done on the overlay thread.

I think the switch between threads will prevent us from observing performance benefits, because txs will be stuck in the main thread scheduler. Ideally, we can bypass the scheduler, and add to the queue directly.

[this, tx]() { mTransactionQueue->tryAdd(tx, false); },
"try add tx");
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will go away if you move this flow to the background completely, but I wanted to point out that we don't want to introduce "unknown" add status. Right now the status is used for two purposes: to decide if the transaction should be flooded, and to return something to the user submitting the transaction. The latter one is important, because downstream systems depend on submission results. We should structure the code such that the submission result is decided synchronously during transaction submission. Note that for local transaction submission (in CommandHandler::tx), we can do it on the main thread for this first iteration.

}
else
{
result = mTransactionQueue->tryAdd(tx, submittedFromSelf);
}
}
else if (mSorobanTransactionQueue)
{
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
{
mApp.postOnOverlayThread(
[this, tx]() { mSorobanTransactionQueue->tryAdd(tx, false); },
"try add tx");
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
}
else
{
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
}
}
else
{
Expand Down Expand Up @@ -923,7 +943,7 @@ HerderImpl::externalizeValue(TxSetXDRFrameConstPtr txSet, uint32_t ledgerSeq,
bool
HerderImpl::sourceAccountPending(AccountID const& accountID) const
{
bool accPending = mTransactionQueue.sourceAccountPending(accountID);
bool accPending = mTransactionQueue->sourceAccountPending(accountID);
if (mSorobanTransactionQueue)
{
accPending = accPending ||
Expand Down Expand Up @@ -1092,7 +1112,7 @@ HerderImpl::getPendingEnvelopes()
ClassicTransactionQueue&
HerderImpl::getTransactionQueue()
{
return mTransactionQueue;
return *mTransactionQueue;
}
SorobanTransactionQueue&
HerderImpl::getSorobanTransactionQueue()
Expand Down Expand Up @@ -1391,7 +1411,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
// it's guaranteed to be up-to-date
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
PerPhaseTransactionList txPhases;
txPhases.emplace_back(mTransactionQueue.getTransactions(lcl.header));
txPhases.emplace_back(mTransactionQueue->getTransactions(lcl.header));

if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
Expand Down Expand Up @@ -1470,7 +1490,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
invalidTxPhases[static_cast<size_t>(TxSetPhase::SOROBAN)]);
}

mTransactionQueue.ban(
mTransactionQueue->ban(
invalidTxPhases[static_cast<size_t>(TxSetPhase::CLASSIC)]);

auto txSetHash = proposedSet->getContentsHash();
Expand Down Expand Up @@ -2172,9 +2192,11 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
{
if (!mSorobanTransactionQueue)
{
releaseAssert(mTxQueueBucketSnapshot);
mSorobanTransactionQueue =
std::make_unique<SorobanTransactionQueue>(
mApp, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
mApp, mTxQueueBucketSnapshot,
TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER);
}
Expand All @@ -2189,6 +2211,15 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
void
HerderImpl::start()
{
releaseAssert(!mTxQueueBucketSnapshot);
mTxQueueBucketSnapshot = mApp.getBucketManager()
.getBucketSnapshotManager()
.copySearchableLiveBucketListSnapshot();
releaseAssert(!mTransactionQueue);
mTransactionQueue = std::make_unique<ClassicTransactionQueue>(
mApp, mTxQueueBucketSnapshot, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS, TRANSACTION_QUEUE_SIZE_MULTIPLIER);

mMaxTxSize = mApp.getHerder().getMaxClassicTxSize();
{
uint32_t version = mApp.getLedgerManager()
Expand Down Expand Up @@ -2333,23 +2364,23 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)

auto lhhe = mLedgerManager.getLastClosedLedgerHeader();

auto updateQueue = [&](auto& queue, auto const& applied) {
queue.removeApplied(applied);
queue.shift();

auto txs = queue.getTransactions(lhhe.header);

auto invalidTxs = TxSetUtils::getInvalidTxList(
auto filterInvalidTxs = [&](TxFrameList const& txs) {
return TxSetUtils::getInvalidTxList(
txs, mApp, 0,
getUpperBoundCloseTimeOffset(mApp, lhhe.header.scpValue.closeTime));
queue.ban(invalidTxs);

queue.rebroadcast();
getUpperBoundCloseTimeOffset(mApp.getAppConnector(),
lhhe.header.scpValue.closeTime));
};
// Update bucket list snapshot, if needed. Note that this modifies the
// pointer itself on update, so we need to pass the potentially new pointer
// to the tx queues.
mApp.getBucketManager()
.getBucketSnapshotManager()
.maybeCopySearchableBucketListSnapshot(mTxQueueBucketSnapshot);
if (txsPerPhase.size() > static_cast<size_t>(TxSetPhase::CLASSIC))
{
updateQueue(mTransactionQueue,
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)]);
mTransactionQueue->update(
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)], lhhe.header,
mTxQueueBucketSnapshot, filterInvalidTxs);
}

// Even if we're in protocol 20, still check for number of phases, in case
Expand All @@ -2358,8 +2389,9 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)
if (mSorobanTransactionQueue != nullptr &&
txsPerPhase.size() > static_cast<size_t>(TxSetPhase::SOROBAN))
{
updateQueue(*mSorobanTransactionQueue,
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)]);
mSorobanTransactionQueue->update(
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)], lhhe.header,
mTxQueueBucketSnapshot, filterInvalidTxs);
}
}

Expand Down Expand Up @@ -2476,7 +2508,7 @@ HerderImpl::isNewerNominationOrBallotSt(SCPStatement const& oldSt,
size_t
HerderImpl::getMaxQueueSizeOps() const
{
return mTransactionQueue.getMaxQueueSizeOps();
return mTransactionQueue->getMaxQueueSizeOps();
}

size_t
Expand All @@ -2490,7 +2522,7 @@ HerderImpl::getMaxQueueSizeSorobanOps() const
bool
HerderImpl::isBannedTx(Hash const& hash) const
{
auto banned = mTransactionQueue.isBanned(hash);
auto banned = mTransactionQueue->isBanned(hash);
if (mSorobanTransactionQueue)
{
banned = banned || mSorobanTransactionQueue->isBanned(hash);
Expand All @@ -2501,7 +2533,7 @@ HerderImpl::isBannedTx(Hash const& hash) const
TransactionFrameBaseConstPtr
HerderImpl::getTx(Hash const& hash) const
{
auto classic = mTransactionQueue.getTx(hash);
auto classic = mTransactionQueue->getTx(hash);
if (!classic && mSorobanTransactionQueue)
{
return mSorobanTransactionQueue->getTx(hash);
Expand Down
5 changes: 4 additions & 1 deletion src/herder/HerderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class HerderImpl : public Herder
void purgeOldPersistedTxSets();
void writeDebugTxSet(LedgerCloseData const& lcd);

ClassicTransactionQueue mTransactionQueue;
std::unique_ptr<ClassicTransactionQueue> mTransactionQueue;
std::unique_ptr<SorobanTransactionQueue> mSorobanTransactionQueue;

void updateTransactionQueue(TxSetXDRFrameConstPtr txSet);
Expand Down Expand Up @@ -301,6 +301,9 @@ class HerderImpl : public Herder
Application& mApp;
LedgerManager& mLedgerManager;

// Bucket list snapshot to use for transaction queues
SearchableSnapshotConstPtr mTxQueueBucketSnapshot;

struct SCPMetrics
{
medida::Meter& mLostSync;
Expand Down
Loading
Loading