From 4b824ee5073cf403fea3e1b650fe509f1073424f Mon Sep 17 00:00:00 2001 From: ohill <145173879+ohill@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:34:00 -0400 Subject: [PATCH] catchpoint: store certs with blocks during catchpoint restore (#5798) --- catchup/catchpointService.go | 31 ++++++++++--------- .../mocks/mockCatchpointCatchupAccessor.go | 5 +-- ledger/catchupaccessor.go | 13 ++++---- ledger/catchupaccessor_test.go | 8 +++-- ledger/store/blockdb/blockdb.go | 10 +++--- .../catchup/catchpointCatchup_test.go | 4 +++ 6 files changed, 42 insertions(+), 29 deletions(-) diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go index 7ad45305ff..a5175aff4c 100644 --- a/catchup/catchpointService.go +++ b/catchup/catchpointService.go @@ -24,6 +24,7 @@ import ( "github.com/algorand/go-deadlock" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" @@ -370,6 +371,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error attemptsCount := 0 var blk *bookkeeping.Block + var cert *agreement.Certificate // check to see if the current ledger might have this block. If so, we should try this first instead of downloading anything. if ledgerBlock, err := cs.ledger.Block(blockRound); err == nil { blk = &ledgerBlock @@ -384,7 +386,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error blockDownloadDuration := time.Duration(0) if blk == nil { var stop bool - blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount)) + blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount)) if stop { return err } else if blk == nil { @@ -462,7 +464,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed when calling StoreBalancesRound : %v", err)) } - err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk) + err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk, cert) if err != nil { if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts { // try again. @@ -542,6 +544,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { prevBlock := &topBlock blocksFetched := uint64(1) // we already got the first block in the previous step. var blk *bookkeeping.Block + var cert *agreement.Certificate for retryCount := uint64(1); blocksFetched <= lookback; { if err := cs.ctx.Err(); err != nil { return cs.stopOrAbort() @@ -564,7 +567,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { blockDownloadDuration := time.Duration(0) if blk == nil { var stop bool - blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount) + blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount) if stop { return err } else if blk == nil { @@ -624,7 +627,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { } // all good, persist and move on. - err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk) + err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk, cert) if err != nil { cs.log.Warnf("processStageBlocksDownload failed to store downloaded staging block for round %d", blk.Round()) cs.updateBlockRetrievalStatistics(-1, -1) @@ -649,17 +652,17 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { // fetchBlock uses the internal peer selector blocksDownloadPeerSelector to pick a peer and then attempt to fetch the block requested from that peer. // The method return stop=true if the caller should exit the current operation // If the method return a nil block, the caller is expected to retry the operation, increasing the retry counter as needed. -func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) { +func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) { psp, err = cs.blocksDownloadPeerSelector.getNextPeer() if err != nil { if err == errPeerSelectorNoPeerPoolsAvailable { cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.") // this is a possible on startup, since the network package might have yet to retrieve the list of peers. time.Sleep(noPeersAvailableSleepInterval) - return nil, time.Duration(0), psp, false, nil + return nil, nil, time.Duration(0), psp, false, nil } err = fmt.Errorf("fetchBlock: unable to obtain a list of peers to retrieve the latest block from : %w", err) - return nil, time.Duration(0), psp, true, cs.abort(err) + return nil, nil, time.Duration(0), psp, true, cs.abort(err) } peer := psp.Peer @@ -669,26 +672,26 @@ func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount ui cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload) if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. - return nil, time.Duration(0), psp, false, nil + return nil, nil, time.Duration(0), psp, false, nil } - return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector")) + return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector")) } fetcher := makeUniversalBlockFetcher(cs.log, cs.net, cs.config) - blk, _, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer) + blk, cert, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer) if err != nil { if cs.ctx.Err() != nil { - return nil, time.Duration(0), psp, true, cs.stopOrAbort() + return nil, nil, time.Duration(0), psp, true, cs.stopOrAbort() } if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) { // try again. cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err) cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed) - return nil, time.Duration(0), psp, false, nil + return nil, nil, time.Duration(0), psp, false, nil } - return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts")) + return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts")) } // success - return blk, downloadDuration, psp, false, nil + return blk, cert, downloadDuration, psp, false, nil } // processStageLedgerDownload is the fifth catchpoint catchup stage. It completes the catchup process, swap the new tables and restart the node functionality. diff --git a/components/mocks/mockCatchpointCatchupAccessor.go b/components/mocks/mockCatchpointCatchupAccessor.go index f488879e7b..d095b703ac 100644 --- a/components/mocks/mockCatchpointCatchupAccessor.go +++ b/components/mocks/mockCatchpointCatchupAccessor.go @@ -19,6 +19,7 @@ package mocks import ( "context" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" @@ -86,12 +87,12 @@ func (m *MockCatchpointCatchupAccessor) StoreBalancesRound(ctx context.Context, } // StoreFirstBlock stores a single block to the blocks database. -func (m *MockCatchpointCatchupAccessor) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) { +func (m *MockCatchpointCatchupAccessor) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) { return nil } // StoreBlock stores a single block to the blocks database. -func (m *MockCatchpointCatchupAccessor) StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) { +func (m *MockCatchpointCatchupAccessor) StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) { return nil } diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index b16d3a8fba..64ada07a08 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/crypto/merkletrie" @@ -78,10 +79,10 @@ type CatchpointCatchupAccessor interface { StoreBalancesRound(ctx context.Context, blk *bookkeeping.Block) (err error) // StoreFirstBlock stores a single block to the blocks database. - StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) + StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) // StoreBlock stores a single block to the blocks database. - StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) + StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) // FinishBlocks concludes the catchup of the blocks database. FinishBlocks(ctx context.Context, applyChanges bool) (err error) @@ -1055,12 +1056,12 @@ func (c *catchpointCatchupAccessorImpl) StoreBalancesRound(ctx context.Context, } // StoreFirstBlock stores a single block to the blocks database. -func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) { +func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) { blockDbs := c.ledger.blockDB() start := time.Now() ledgerStorefirstblockCount.Inc(nil) err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - return blockdb.BlockStartCatchupStaging(tx, *blk) + return blockdb.BlockStartCatchupStaging(tx, *blk, *cert) }) ledgerStorefirstblockMicros.AddMicrosecondsSince(start, nil) if err != nil { @@ -1070,12 +1071,12 @@ func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk } // StoreBlock stores a single block to the blocks database. -func (c *catchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) { +func (c *catchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *bookkeeping.Block, cert *agreement.Certificate) (err error) { blockDbs := c.ledger.blockDB() start := time.Now() ledgerCatchpointStoreblockCount.Inc(nil) err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - return blockdb.BlockPutStaging(tx, *blk) + return blockdb.BlockPutStaging(tx, *blk, *cert) }) ledgerCatchpointStoreblockMicros.AddMicrosecondsSince(start, nil) if err != nil { diff --git a/ledger/catchupaccessor_test.go b/ledger/catchupaccessor_test.go index a97377bd3e..63b0fa2420 100644 --- a/ledger/catchupaccessor_test.go +++ b/ledger/catchupaccessor_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" @@ -162,7 +163,7 @@ func initializeTestCatchupAccessor(t *testing.T, l *Ledger, accountsCount uint64 require.NoError(t, err) // We do this to initialize the catchpointblocks table. Needed to be able to use CompleteCatchup. - err = catchpointAccessor.StoreFirstBlock(ctx, &bookkeeping.Block{}) + err = catchpointAccessor.StoreFirstBlock(ctx, &bookkeeping.Block{}, &agreement.Certificate{}) require.NoError(t, err) // We do this to initialize the accounttotals table. Needed to be able to use CompleteCatchup. @@ -441,6 +442,7 @@ func TestVerifyCatchpoint(t *testing.T) { // actual testing... var blk bookkeeping.Block + var cert agreement.Certificate err = catchpointAccessor.VerifyCatchpoint(ctx, &blk) require.Error(t, err) @@ -455,14 +457,14 @@ func TestVerifyCatchpoint(t *testing.T) { err = catchpointAccessor.StoreBalancesRound(ctx, &blk) require.NoError(t, err) // StoreFirstBlock is a dumb wrapper on some db logic - err = catchpointAccessor.StoreFirstBlock(ctx, &blk) + err = catchpointAccessor.StoreFirstBlock(ctx, &blk, &cert) require.NoError(t, err) _, err = catchpointAccessor.EnsureFirstBlock(ctx) require.NoError(t, err) blk.BlockHeader.Round++ - err = catchpointAccessor.StoreBlock(ctx, &blk) + err = catchpointAccessor.StoreBlock(ctx, &blk, &cert) require.NoError(t, err) // TODO: write a case with working no-err diff --git a/ledger/store/blockdb/blockdb.go b/ledger/store/blockdb/blockdb.go index 64b6f02fcc..bbd11d6c1e 100644 --- a/ledger/store/blockdb/blockdb.go +++ b/ledger/store/blockdb/blockdb.go @@ -242,7 +242,7 @@ func BlockForgetBefore(tx *sql.Tx, rnd basics.Round) error { } // BlockStartCatchupStaging initializes catchup for catchpoint -func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error { +func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) error { // delete the old catchpointblocks table, if there is such. for _, stmt := range blockResetExprs { stmt = strings.Replace(stmt, "blocks", "catchpointblocks", 1) @@ -262,11 +262,12 @@ func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error { } // insert the top entry to the blocks table. - _, err := tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata) VALUES (?, ?, ?, ?)", + _, err := tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata, certdata) VALUES (?, ?, ?, ?, ?)", blk.Round(), blk.CurrentProtocol, protocol.Encode(&blk.BlockHeader), protocol.Encode(&blk), + protocol.Encode(&cert), ) if err != nil { return err @@ -305,13 +306,14 @@ func BlockAbortCatchup(tx *sql.Tx) error { } // BlockPutStaging store a block into catchpoint staging table -func BlockPutStaging(tx *sql.Tx, blk bookkeeping.Block) (err error) { +func BlockPutStaging(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) (err error) { // insert the new entry - _, err = tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata) VALUES (?, ?, ?, ?)", + _, err = tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata, certdata) VALUES (?, ?, ?, ?, ?)", blk.Round(), blk.CurrentProtocol, protocol.Encode(&blk.BlockHeader), protocol.Encode(&blk), + protocol.Encode(&cert), ) if err != nil { return err diff --git a/test/e2e-go/features/catchup/catchpointCatchup_test.go b/test/e2e-go/features/catchup/catchpointCatchup_test.go index f9f62b3d06..eb83451307 100644 --- a/test/e2e-go/features/catchup/catchpointCatchup_test.go +++ b/test/e2e-go/features/catchup/catchpointCatchup_test.go @@ -363,6 +363,10 @@ func TestBasicCatchpointCatchup(t *testing.T) { err = fixture.ClientWaitForRoundWithTimeout(usingNodeRestClient, uint64(targetCatchpointRound+1)) a.NoError(err) + + // ensure the raw block can be downloaded (including cert) + _, err = usingNodeRestClient.RawBlock(uint64(targetCatchpointRound)) + a.NoError(err) } func TestCatchpointLabelGeneration(t *testing.T) {