-
Notifications
You must be signed in to change notification settings - Fork 979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Background tryAdd
functionality in TransactionQueue
#4617
base: master
Are you sure you want to change the base?
Changes from all commits
84444eb
5c77e4d
ab96235
3ccb6e1
ff35c02
6410694
139dc27
992bccd
18e03a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,10 +83,7 @@ HerderImpl::SCPMetrics::SCPMetrics(Application& app) | |
} | ||
|
||
HerderImpl::HerderImpl(Application& app) | ||
: mTransactionQueue(app, TRANSACTION_QUEUE_TIMEOUT_LEDGERS, | ||
TRANSACTION_QUEUE_BAN_LEDGERS, | ||
TRANSACTION_QUEUE_SIZE_MULTIPLIER) | ||
, mPendingEnvelopes(app, *this) | ||
: mPendingEnvelopes(app, *this) | ||
, mHerderSCPDriver(app, *this, mUpgrades, mPendingEnvelopes) | ||
, mLastSlotSaved(0) | ||
, mTrackingTimer(app) | ||
|
@@ -275,7 +272,10 @@ HerderImpl::shutdown() | |
"Shutdown interrupting quorum transitive closure analysis."); | ||
mLastQuorumMapIntersectionState.mInterruptFlag = true; | ||
} | ||
mTransactionQueue.shutdown(); | ||
if (mTransactionQueue) | ||
{ | ||
mTransactionQueue->shutdown(); | ||
} | ||
if (mSorobanTransactionQueue) | ||
{ | ||
mSorobanTransactionQueue->shutdown(); | ||
|
@@ -603,7 +603,7 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf) | |
mSorobanTransactionQueue->sourceAccountPending(tx->getSourceID()) && | ||
!tx->isSoroban(); | ||
bool hasClassic = | ||
mTransactionQueue.sourceAccountPending(tx->getSourceID()) && | ||
mTransactionQueue->sourceAccountPending(tx->getSourceID()) && | ||
tx->isSoroban(); | ||
if (hasSoroban || hasClassic) | ||
{ | ||
|
@@ -617,11 +617,31 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf) | |
} | ||
else if (!tx->isSoroban()) | ||
{ | ||
result = mTransactionQueue.tryAdd(tx, submittedFromSelf); | ||
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf) | ||
{ | ||
mApp.postOnOverlayThread( | ||
[this, tx]() { mTransactionQueue->tryAdd(tx, false); }, | ||
"try add tx"); | ||
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this will go away if you move this flow to the background completely, but I wanted to point out that we don't want to introduce "unknown" add status. Right now the status is used for two purposes: to decide if the transaction should be flooded, and to return something to the user submitting the transaction. The latter one is important, because downstream systems depend on submission results. We should structure the code such that the submission result is decided synchronously during transaction submission. Note that for local transaction submission (in CommandHandler::tx), we can do it on the main thread for this first iteration. |
||
} | ||
else | ||
{ | ||
result = mTransactionQueue->tryAdd(tx, submittedFromSelf); | ||
} | ||
} | ||
else if (mSorobanTransactionQueue) | ||
{ | ||
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf); | ||
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf) | ||
{ | ||
mApp.postOnOverlayThread( | ||
[this, tx]() { mSorobanTransactionQueue->tryAdd(tx, false); }, | ||
"try add tx"); | ||
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN; | ||
} | ||
else | ||
{ | ||
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf); | ||
} | ||
} | ||
else | ||
{ | ||
|
@@ -923,7 +943,7 @@ HerderImpl::externalizeValue(TxSetXDRFrameConstPtr txSet, uint32_t ledgerSeq, | |
bool | ||
HerderImpl::sourceAccountPending(AccountID const& accountID) const | ||
{ | ||
bool accPending = mTransactionQueue.sourceAccountPending(accountID); | ||
bool accPending = mTransactionQueue->sourceAccountPending(accountID); | ||
if (mSorobanTransactionQueue) | ||
{ | ||
accPending = accPending || | ||
|
@@ -1092,7 +1112,7 @@ HerderImpl::getPendingEnvelopes() | |
ClassicTransactionQueue& | ||
HerderImpl::getTransactionQueue() | ||
{ | ||
return mTransactionQueue; | ||
return *mTransactionQueue; | ||
} | ||
SorobanTransactionQueue& | ||
HerderImpl::getSorobanTransactionQueue() | ||
|
@@ -1391,7 +1411,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger, | |
// it's guaranteed to be up-to-date | ||
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader(); | ||
PerPhaseTransactionList txPhases; | ||
txPhases.emplace_back(mTransactionQueue.getTransactions(lcl.header)); | ||
txPhases.emplace_back(mTransactionQueue->getTransactions(lcl.header)); | ||
|
||
if (protocolVersionStartsFrom(lcl.header.ledgerVersion, | ||
SOROBAN_PROTOCOL_VERSION)) | ||
|
@@ -1470,7 +1490,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger, | |
invalidTxPhases[static_cast<size_t>(TxSetPhase::SOROBAN)]); | ||
} | ||
|
||
mTransactionQueue.ban( | ||
mTransactionQueue->ban( | ||
invalidTxPhases[static_cast<size_t>(TxSetPhase::CLASSIC)]); | ||
|
||
auto txSetHash = proposedSet->getContentsHash(); | ||
|
@@ -2172,9 +2192,11 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion) | |
{ | ||
if (!mSorobanTransactionQueue) | ||
{ | ||
releaseAssert(mTxQueueBucketSnapshot); | ||
mSorobanTransactionQueue = | ||
std::make_unique<SorobanTransactionQueue>( | ||
mApp, TRANSACTION_QUEUE_TIMEOUT_LEDGERS, | ||
mApp, mTxQueueBucketSnapshot, | ||
TRANSACTION_QUEUE_TIMEOUT_LEDGERS, | ||
TRANSACTION_QUEUE_BAN_LEDGERS, | ||
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER); | ||
} | ||
|
@@ -2189,6 +2211,15 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion) | |
void | ||
HerderImpl::start() | ||
{ | ||
releaseAssert(!mTxQueueBucketSnapshot); | ||
mTxQueueBucketSnapshot = mApp.getBucketManager() | ||
.getBucketSnapshotManager() | ||
.copySearchableLiveBucketListSnapshot(); | ||
releaseAssert(!mTransactionQueue); | ||
mTransactionQueue = std::make_unique<ClassicTransactionQueue>( | ||
mApp, mTxQueueBucketSnapshot, TRANSACTION_QUEUE_TIMEOUT_LEDGERS, | ||
TRANSACTION_QUEUE_BAN_LEDGERS, TRANSACTION_QUEUE_SIZE_MULTIPLIER); | ||
|
||
mMaxTxSize = mApp.getHerder().getMaxClassicTxSize(); | ||
{ | ||
uint32_t version = mApp.getLedgerManager() | ||
|
@@ -2333,23 +2364,23 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet) | |
|
||
auto lhhe = mLedgerManager.getLastClosedLedgerHeader(); | ||
|
||
auto updateQueue = [&](auto& queue, auto const& applied) { | ||
queue.removeApplied(applied); | ||
queue.shift(); | ||
|
||
auto txs = queue.getTransactions(lhhe.header); | ||
|
||
auto invalidTxs = TxSetUtils::getInvalidTxList( | ||
auto filterInvalidTxs = [&](TxFrameList const& txs) { | ||
return TxSetUtils::getInvalidTxList( | ||
txs, mApp, 0, | ||
getUpperBoundCloseTimeOffset(mApp, lhhe.header.scpValue.closeTime)); | ||
queue.ban(invalidTxs); | ||
|
||
queue.rebroadcast(); | ||
getUpperBoundCloseTimeOffset(mApp.getAppConnector(), | ||
lhhe.header.scpValue.closeTime)); | ||
}; | ||
// Update bucket list snapshot, if needed. Note that this modifies the | ||
// pointer itself on update, so we need to pass the potentially new pointer | ||
// to the tx queues. | ||
mApp.getBucketManager() | ||
.getBucketSnapshotManager() | ||
.maybeCopySearchableBucketListSnapshot(mTxQueueBucketSnapshot); | ||
if (txsPerPhase.size() > static_cast<size_t>(TxSetPhase::CLASSIC)) | ||
{ | ||
updateQueue(mTransactionQueue, | ||
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)]); | ||
mTransactionQueue->update( | ||
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)], lhhe.header, | ||
mTxQueueBucketSnapshot, filterInvalidTxs); | ||
} | ||
|
||
// Even if we're in protocol 20, still check for number of phases, in case | ||
|
@@ -2358,8 +2389,9 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet) | |
if (mSorobanTransactionQueue != nullptr && | ||
txsPerPhase.size() > static_cast<size_t>(TxSetPhase::SOROBAN)) | ||
{ | ||
updateQueue(*mSorobanTransactionQueue, | ||
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)]); | ||
mSorobanTransactionQueue->update( | ||
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)], lhhe.header, | ||
mTxQueueBucketSnapshot, filterInvalidTxs); | ||
} | ||
} | ||
|
||
|
@@ -2476,7 +2508,7 @@ HerderImpl::isNewerNominationOrBallotSt(SCPStatement const& oldSt, | |
size_t | ||
HerderImpl::getMaxQueueSizeOps() const | ||
{ | ||
return mTransactionQueue.getMaxQueueSizeOps(); | ||
return mTransactionQueue->getMaxQueueSizeOps(); | ||
} | ||
|
||
size_t | ||
|
@@ -2490,7 +2522,7 @@ HerderImpl::getMaxQueueSizeSorobanOps() const | |
bool | ||
HerderImpl::isBannedTx(Hash const& hash) const | ||
{ | ||
auto banned = mTransactionQueue.isBanned(hash); | ||
auto banned = mTransactionQueue->isBanned(hash); | ||
if (mSorobanTransactionQueue) | ||
{ | ||
banned = banned || mSorobanTransactionQueue->isBanned(hash); | ||
|
@@ -2501,7 +2533,7 @@ HerderImpl::isBannedTx(Hash const& hash) const | |
TransactionFrameBaseConstPtr | ||
HerderImpl::getTx(Hash const& hash) const | ||
{ | ||
auto classic = mTransactionQueue.getTx(hash); | ||
auto classic = mTransactionQueue->getTx(hash); | ||
if (!classic && mSorobanTransactionQueue) | ||
{ | ||
return mSorobanTransactionQueue->getTx(hash); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping we could move the whole flow Peer::recvTransaction to the background to avoid going to the main thread. Right now the flow is:
I think the switch between threads will prevent us from observing performance benefits, because txs will be stuck in the main thread scheduler. Ideally, we can bypass the scheduler, and add to the queue directly.