diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index dd576fc41f..32216fcb20 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -105,9 +105,6 @@ type catchpointTracker struct { // enableGeneratingCatchpointFiles determines whether catchpoints files should be generated by the trackers. enableGeneratingCatchpointFiles bool - // Prepared SQL statements for fast accounts DB lookups. - accountsq trackerdb.AccountsReader - // log copied from ledger log logging.Logger @@ -138,6 +135,9 @@ type catchpointTracker struct { // roundDigest stores the digest of the block for every round starting with dbRound+1 and every round after it. roundDigest []crypto.Digest + // consensusVersion stores the consensus versions for every round starting with dbRound+1 and every round after it. + consensusVersion []protocol.ConsensusVersion + // reenableCatchpointsRound is a round where the EnableCatchpointsWithSPContexts feature was enabled via the consensus. // we avoid generating catchpoints before that round in order to ensure the network remain consistent in the catchpoint // label being produced. This variable could be "wrong" in two cases - @@ -151,9 +151,13 @@ type catchpointTracker struct { // catchpoint files even before the protocol upgrade took place. forceCatchpointFileWriting bool - // catchpointsMu protects `roundDigest`, `reenableCatchpointsRound` and + // catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound and // `lastCatchpointLabel`. catchpointsMu deadlock.RWMutex + + // cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()), + // cached to use in lookup functions + cachedDBRound basics.Round } // initialize initializes the catchpointTracker structure @@ -205,7 +209,7 @@ func (ct *catchpointTracker) getSPVerificationData() (encodedData []byte, spVeri return encodedData, spVerificationHash, nil } -func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, updatingBalancesDuration time.Duration) error { +func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, blockProto protocol.ConsensusVersion, updatingBalancesDuration time.Duration) error { ct.log.Infof("finishing catchpoint's first stage dbRound: %d", dbRound) var totalKVs uint64 @@ -216,11 +220,15 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic var spVerificationEncodedData []byte var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails - // Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints, - // and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled. - spVerificationEncodedData, spVerificationHash, err := ct.getSPVerificationData() - if err != nil { - return err + params := config.Consensus[blockProto] + if params.EnableCatchpointsWithSPContexts { + // Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints, + // and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled. + var err error + spVerificationEncodedData, spVerificationHash, err = ct.getSPVerificationData() + if err != nil { + return err + } } if ct.enableGeneratingCatchpointFiles { @@ -257,7 +265,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic // Possibly finish generating first stage catchpoint db record and data file after // a crash. -func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) error { +func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round, blockProto protocol.ConsensusVersion) error { v, err := ct.catchpointStore.ReadCatchpointStateUint64( context.Background(), trackerdb.CatchpointStateWritingFirstStageInfo) if err != nil { @@ -274,10 +282,10 @@ func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) er return err } - return ct.finishFirstStage(context.Background(), dbRound, 0) + return ct.finishFirstStage(context.Background(), dbRound, blockProto, 0) } -func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint64) error { +func (ct *catchpointTracker) finishCatchpointsAfterCrash(blockProto protocol.ConsensusVersion, catchpointLookback uint64) error { records, err := ct.catchpointStore.SelectUnfinishedCatchpoints(context.Background()) if err != nil { return err @@ -292,7 +300,7 @@ func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint } err = ct.finishCatchpoint( - context.Background(), record.Round, record.BlockHash, catchpointLookback) + context.Background(), record.Round, record.BlockHash, blockProto, catchpointLookback) if err != nil { return err } @@ -301,8 +309,8 @@ func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint return nil } -func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error { - err := ct.finishFirstStageAfterCrash(dbRound) +func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round, blockProto protocol.ConsensusVersion) error { + err := ct.finishFirstStageAfterCrash(dbRound, blockProto) if err != nil { return err } @@ -316,7 +324,7 @@ func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error { } if catchpointLookback != 0 { - err = ct.finishCatchpointsAfterCrash(catchpointLookback) + err = ct.finishCatchpointsAfterCrash(blockProto, catchpointLookback) if err != nil { return err } @@ -346,11 +354,15 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou return err } + ct.catchpointsMu.Lock() + ct.cachedDBRound = dbRound ct.roundDigest = nil + ct.consensusVersion = nil ct.catchpointDataWriting.Store(0) // keep these channel closed if we're not generating catchpoint ct.catchpointDataSlowWriting = make(chan struct{}, 1) close(ct.catchpointDataSlowWriting) + ct.catchpointsMu.Unlock() err = ct.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) error { return ct.initializeHashes(ctx, tx, dbRound) @@ -359,18 +371,18 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou return err } - ct.accountsq, err = ct.dbs.MakeAccountsOptimizedReader() + ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString( + context.Background(), trackerdb.CatchpointStateLastCatchpoint) if err != nil { return } - ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString( - context.Background(), trackerdb.CatchpointStateLastCatchpoint) + hdr, err := l.BlockHdr(dbRound) if err != nil { return } - return ct.recoverFromCrash(dbRound) + return ct.recoverFromCrash(dbRound, hdr.CurrentProtocol) } // newBlock informs the tracker of a new block from round @@ -380,6 +392,7 @@ func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.St defer ct.catchpointsMu.Unlock() ct.roundDigest = append(ct.roundDigest, blk.Digest()) + ct.consensusVersion = append(ct.consensusVersion, blk.CurrentProtocol) if (config.Consensus[blk.CurrentProtocol].EnableCatchpointsWithSPContexts || ct.forceCatchpointFileWriting) && ct.reenableCatchpointsRound == 0 { catchpointLookback := config.Consensus[blk.CurrentProtocol].CatchpointLookback @@ -396,7 +409,10 @@ func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.St // number that can be removed from the blocks database as well as the lookback that this // tracker maintains. func (ct *catchpointTracker) committedUpTo(rnd basics.Round) (retRound, lookback basics.Round) { - return rnd, basics.Round(0) + ct.catchpointsMu.RLock() + defer ct.catchpointsMu.RUnlock() + retRound = ct.cachedDBRound + return retRound, basics.Round(0) } // Calculate whether we have intermediate first stage catchpoint rounds and the @@ -505,6 +521,8 @@ func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error { dcc.committedRoundDigests = make([]crypto.Digest, dcc.offset) copy(dcc.committedRoundDigests, ct.roundDigest[:dcc.offset]) + dcc.committedProtocolVersion = make([]protocol.ConsensusVersion, dcc.offset) + copy(dcc.committedProtocolVersion, ct.consensusVersion[:dcc.offset]) return nil } @@ -601,6 +619,8 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit ct.catchpointsMu.Lock() ct.roundDigest = ct.roundDigest[dcc.offset:] + ct.consensusVersion = ct.consensusVersion[dcc.offset:] + ct.cachedDBRound = dcc.newBase() ct.catchpointsMu.Unlock() dcc.updatingBalancesDuration = time.Since(dcc.flushTime) @@ -736,9 +756,18 @@ func repackCatchpoint(ctx context.Context, header CatchpointFileHeader, biggestC // Create a catchpoint (a label and possibly a file with db record) and remove // the unfinished catchpoint record. -func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound basics.Round, round basics.Round, dataInfo trackerdb.CatchpointFirstStageInfo, blockHash crypto.Digest) error { +func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound basics.Round, round basics.Round, dataInfo trackerdb.CatchpointFirstStageInfo, blockHash crypto.Digest, blockProto protocol.ConsensusVersion) error { startTime := time.Now() - labelMaker := ledgercore.MakeCatchpointLabelMakerCurrent(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals, &dataInfo.StateProofVerificationHash) + var labelMaker ledgercore.CatchpointLabelMaker + var version uint64 + params := config.Consensus[blockProto] + if params.EnableCatchpointsWithSPContexts { + labelMaker = ledgercore.MakeCatchpointLabelMakerCurrent(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals, &dataInfo.StateProofVerificationHash) + version = CatchpointFileVersionV7 + } else { + labelMaker = ledgercore.MakeCatchpointLabelMakerV6(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals) + version = CatchpointFileVersionV6 + } label := ledgercore.MakeLabel(labelMaker) ct.log.Infof( @@ -774,7 +803,7 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound // Make a catchpoint file. header := CatchpointFileHeader{ - Version: CatchpointFileVersionV7, + Version: version, BalancesRound: accountsRound, BlocksRound: round, Totals: dataInfo.Totals, @@ -834,7 +863,7 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound // Try create a catchpoint (a label and possibly a file with db record) and remove // the unfinished catchpoint record. -func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest, catchpointLookback uint64) error { +func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest, blockProto protocol.ConsensusVersion, catchpointLookback uint64) error { accountsRound := round - basics.Round(catchpointLookback) ct.log.Infof("finishing catchpoint round: %d accountsRound: %d", round, accountsRound) @@ -847,7 +876,7 @@ func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics. if !exists { return ct.catchpointStore.DeleteUnfinishedCatchpoint(ctx, round) } - return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash) + return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash, blockProto) } // Calculate catchpoint round numbers in [min, max]. `catchpointInterval` must be @@ -908,7 +937,9 @@ func (ct *catchpointTracker) pruneFirstStageRecordsData(ctx context.Context, max func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { if dcc.catchpointFirstStage { - err := ct.finishFirstStage(ctx, dcc.newBase(), dcc.updatingBalancesDuration) + round := dcc.newBase() + blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1] + err := ct.finishFirstStage(ctx, round, blockProto, dcc.updatingBalancesDuration) if err != nil { ct.log.Warnf( "error finishing catchpoint's first stage dcc.newBase: %d err: %v", @@ -918,8 +949,10 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr // Generate catchpoints for rounds in (dcc.oldBase, dcc.newBase]. for _, round := range ct.calculateCatchpointRounds(&dcc.deferredCommitRange) { + blockHash := dcc.committedRoundDigests[round-dcc.oldBase-1] + blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1] err := ct.finishCatchpoint( - ctx, round, dcc.committedRoundDigests[round-dcc.oldBase-1], dcc.catchpointLookback) + ctx, round, blockHash, blockProto, dcc.catchpointLookback) if err != nil { ct.log.Warnf("error creating catchpoint round: %d err: %v", round, err) } @@ -1157,9 +1190,13 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account return } - err = catchpointWriter.FileWriteSPVerificationContext(encodedSPData) - if err != nil { - return + // do not write encodedSPData if not provided, + // this is an indication the older catchpoint file is being generated. + if encodedSPData != nil { + err = catchpointWriter.FileWriteSPVerificationContext(encodedSPData) + if err != nil { + return + } } for more { diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index a658cfebe4..a3a4509330 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -360,7 +360,10 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics. require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash) - err = ct.createCatchpoint(context.Background(), accountsRound, round, trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen}, crypto.Digest{}) + err = ct.createCatchpoint( + context.Background(), accountsRound, round, + trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen}, + crypto.Digest{}, protocol.ConsensusCurrentVersion) require.NoError(t, err) } @@ -760,8 +763,10 @@ func TestCatchpointReproducibleLabels(t *testing.T) { // test to see that after loadFromDisk, all the tracker content is lost ( as expected ) require.NotZero(t, len(ct.roundDigest)) + require.NotZero(t, len(ct.consensusVersion)) require.NoError(t, ct.loadFromDisk(ml, ml.Latest())) require.Zero(t, len(ct.roundDigest)) + require.Zero(t, len(ct.consensusVersion)) require.Zero(t, ct.catchpointDataWriting.Load()) select { case _, closed := <-ct.catchpointDataSlowWriting: @@ -771,6 +776,56 @@ func TestCatchpointReproducibleLabels(t *testing.T) { } } +// TestCatchpointBackwardCompatibleLabels checks labels before and after EnableCatchpointsWithSPContexts was introduced. +func TestCatchpointBackwardCompatibleLabels(t *testing.T) { + partitiontest.PartitionTest(t) + + temporaryDirectory := t.TempDir() + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} + ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts) + defer ml.Close() + + ct := &catchpointTracker{enableGeneratingCatchpointFiles: false} + conf := config.GetDefaultLocal() + + conf.Archival = true + paths := DirsAndPrefix{ + ResolvedGenesisDirs: config.ResolvedGenesisDirs{ + CatchpointGenesisDir: ".", + HotGenesisDir: ".", + }, + } + ct.initialize(conf, paths) + + defer ct.close() + ct.dbDirectory = temporaryDirectory + ct.tmpDir = temporaryDirectory + + _, err := trackerDBInitialize(ml, true, ct.dbDirectory) + require.NoError(t, err) + + err = ct.loadFromDisk(ml, ml.Latest()) + require.NoError(t, err) + + // create catpoint with the latest version of the code + round := basics.Round(2000) + + protos := []protocol.ConsensusVersion{protocol.ConsensusCurrentVersion, protocol.ConsensusV37, protocol.ConsensusV36} + labels := make([]string, len(protos)) + for i, proto := range protos { + err = ct.createCatchpoint( + context.Background(), round-1, round, + trackerdb.CatchpointFirstStageInfo{}, + crypto.Digest{}, proto) + require.NoError(t, err) + require.NotEmpty(t, ct.lastCatchpointLabel) + labels[i] = ct.lastCatchpointLabel + } + require.NotEqual(t, labels[0], labels[1]) + require.Equal(t, labels[1], labels[2]) +} + // blockingTracker is a testing tracker used to test "what if" a tracker would get blocked. type blockingTracker struct { emptyTracker diff --git a/ledger/tracker.go b/ledger/tracker.go index 7ad5ba6641..37fa6adf5f 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -269,6 +269,9 @@ type deferredCommitContext struct { // Block hashes for the committed rounds range. committedRoundDigests []crypto.Digest + // Consensus versions for the committed rounds range. + committedProtocolVersion []protocol.ConsensusVersion + // on catchpoint rounds, the transaction tail would fill up this field with the hash of the recent 1001 rounds // of the txtail data. The catchpointTracker would be able to use that for calculating the catchpoint label. txTailHash crypto.Digest