diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index 1de2be73..f293b3b7 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -39,8 +39,9 @@ type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error type EVMDownloader struct { syncBlockChunkSize uint64 EVMDownloaderInterface - log *log.Logger - finalizedBlockType etherman.BlockNumberFinality + log *log.Logger + finalizedBlockType etherman.BlockNumberFinality + stopDownloaderOnIterationN int } func NewEVMDownloader( @@ -101,9 +102,16 @@ func NewEVMDownloader( }, nil } +// setStopDownloaderOnIterationN sets the block number to stop the downloader (just for unittest) +func (d *EVMDownloader) setStopDownloaderOnIterationN(iteration int) { + d.stopDownloaderOnIterationN = iteration +} + func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) { lastBlock := d.WaitForNewBlocks(ctx, 0) - + toBlock := fromBlock + d.syncBlockChunkSize + iteration := 0 + reachTop := false for { select { case <-ctx.Done(): @@ -112,53 +120,68 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download return default: } + d.log.Debugf("range: %d to %d, last block: %d", fromBlock, toBlock, lastBlock) - toBlock := fromBlock + d.syncBlockChunkSize - if toBlock > lastBlock { - toBlock = lastBlock - } - - if fromBlock > toBlock { - d.log.Infof( - "waiting for new blocks, last block processed: %d, last block seen on L1: %d", - fromBlock-1, lastBlock, + if fromBlock > lastBlock || (reachTop && toBlock >= lastBlock) { + d.log.Debugf( + "waiting for new blocks, current range: [%d to %d], last block seen: %d", + fromBlock, toBlock, lastBlock, ) - lastBlock = d.WaitForNewBlocks(ctx, fromBlock-1) - continue - } + lastBlock = d.WaitForNewBlocks(ctx, lastBlock) + d.log.Debugf("new last block seen: %d", lastBlock) + if fromBlock-toBlock < d.syncBlockChunkSize { + toBlock = fromBlock + d.syncBlockChunkSize + } + } + reachTop = false lastFinalizedBlock, err := d.GetLastFinalizedBlock(ctx) if err != nil { d.log.Error("error getting last finalized block: ", err) continue } + // lastFinalizedBlock can't be > lastBlock + lastFinalizedBlockNumber := min(lastBlock, lastFinalizedBlock.Number.Uint64()) - lastFinalizedBlockNumber := lastFinalizedBlock.Number.Uint64() - - d.log.Infof("getting events from blocks %d to %d. lastFinalizedBlock: %d", - fromBlock, toBlock, lastFinalizedBlockNumber) - blocks := d.GetEventsByBlockRange(ctx, fromBlock, toBlock) - + requestToBlock := toBlock + if toBlock >= lastBlock { + requestToBlock = lastBlock + reachTop = true + } + d.log.Debugf("getting events from blocks [%d to %d] toBlock: %d. lastFinalizedBlock: %d lastBlock: %d", + fromBlock, requestToBlock, toBlock, lastFinalizedBlockNumber, lastBlock) + blocks := d.GetEventsByBlockRange(ctx, fromBlock, requestToBlock) + d.log.Debugf("result events from blocks [%d to %d] -> len(blocks)=%d", + fromBlock, requestToBlock, len(blocks)) if toBlock <= lastFinalizedBlockNumber { d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber) - fromBlock = toBlock + 1 - if blocks.Len() == 0 || blocks[blocks.Len()-1].Num < toBlock { d.reportEmptyBlock(ctx, downloadedCh, toBlock, lastFinalizedBlockNumber) } + fromBlock = toBlock + 1 + toBlock = fromBlock + d.syncBlockChunkSize } else { - d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber) - if blocks.Len() == 0 { - if lastFinalizedBlockNumber > fromBlock && - lastFinalizedBlockNumber-fromBlock > d.syncBlockChunkSize { - d.reportEmptyBlock(ctx, downloadedCh, fromBlock+d.syncBlockChunkSize, lastFinalizedBlockNumber) - fromBlock += d.syncBlockChunkSize + 1 + if lastFinalizedBlockNumber >= fromBlock { + emptyBlock := lastFinalizedBlockNumber + d.reportEmptyBlock(ctx, downloadedCh, emptyBlock, lastFinalizedBlockNumber) + fromBlock = emptyBlock + 1 + toBlock = fromBlock + d.syncBlockChunkSize + } else { + // Extend range until find logs or reach the last finalized block + toBlock += d.syncBlockChunkSize } } else { + d.reportBlocks(downloadedCh, blocks, lastFinalizedBlockNumber) fromBlock = blocks[blocks.Len()-1].Num + 1 + toBlock = fromBlock + d.syncBlockChunkSize } } + iteration++ + if d.stopDownloaderOnIterationN != 0 && iteration >= d.stopDownloaderOnIterationN { + d.log.Infof("stop downloader on iteration %d", iteration) + return + } } } diff --git a/sync/evmdownloader_test.go b/sync/evmdownloader_test.go index b4c600f1..fed8a8e1 100644 --- a/sync/evmdownloader_test.go +++ b/sync/evmdownloader_test.go @@ -3,6 +3,7 @@ package sync import ( "context" "errors" + "fmt" "math/big" "strconv" "testing" @@ -197,183 +198,6 @@ func generateEvent(blockNum uint32) (*types.Log, testEvent) { return log, testEvent(h) } -func TestDownload(t *testing.T) { - /* - NOTE: due to the concurrent nature of this test (the function being tested runs through a goroutine) - if the mock doesn't match, the goroutine will get stuck and the test will timeout - */ - d := NewEVMDownloaderMock(t) - downloadCh := make(chan EVMBlock, 1) - ctx := context.Background() - ctx1, cancel := context.WithCancel(ctx) - expectedBlocks := EVMBlocks{} - dwnldr, _ := NewTestDownloader(t, time.Millisecond*100) - dwnldr.EVMDownloaderInterface = d - - d.On("WaitForNewBlocks", mock.Anything, uint64(0)). - Return(uint64(1)) - - lastFinalizedBlock := &types.Header{Number: big.NewInt(1)} - createEVMBlockFn := func(header *types.Header, isSafeBlock bool) *EVMBlock { - return &EVMBlock{ - IsFinalizedBlock: isSafeBlock, - EVMBlockHeader: EVMBlockHeader{ - Num: header.Number.Uint64(), - Hash: header.Hash(), - ParentHash: header.ParentHash, - Timestamp: header.Time, - }, - } - } - - // iteration 0: - // last block is 1, download that block (no events and wait) - b0 := createEVMBlockFn(lastFinalizedBlock, true) - expectedBlocks = append(expectedBlocks, b0) - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)). - Return(EVMBlocks{}, false).Once() - d.On("GetBlockHeader", mock.Anything, uint64(1)).Return(b0.EVMBlockHeader, false).Once() - - // iteration 1: we have a new block, so increase to block (no events) - lastFinalizedBlock = &types.Header{Number: big.NewInt(2)} - b2 := createEVMBlockFn(lastFinalizedBlock, true) - expectedBlocks = append(expectedBlocks, b2) - d.On("WaitForNewBlocks", mock.Anything, uint64(1)). - Return(uint64(2)) - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(2), uint64(2)). - Return(EVMBlocks{}, false).Once() - d.On("GetBlockHeader", mock.Anything, uint64(2)).Return(b2.EVMBlockHeader, false).Once() - - // iteration 2: wait for next block to be created (jump to block 8) - d.On("WaitForNewBlocks", mock.Anything, uint64(2)). - After(time.Millisecond * 100). - Return(uint64(8)).Once() - - // iteration 3: blocks 6 and 7 have events, last finalized block is 5 - lastFinalizedBlock = &types.Header{Number: big.NewInt(5)} - b6 := &EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 6, - Hash: common.HexToHash("06"), - }, - Events: []interface{}{"06"}, - } - b7 := &EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 7, - Hash: common.HexToHash("07"), - }, - Events: []interface{}{"07"}, - } - expectedBlocks = append(expectedBlocks, b6, b7) - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)). - Return(EVMBlocks{b6, b7}, false) - - // iteration 4: finalized block is now block 8, report the finalized block - lastFinalizedBlock = &types.Header{Number: big.NewInt(8)} - b8 := createEVMBlockFn(lastFinalizedBlock, true) - expectedBlocks = append(expectedBlocks, b8) - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(8), uint64(8)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(8)).Return(b8.EVMBlockHeader, false).Once() - - // iteration 5: from block 9 to 19, no events - lastFinalizedBlock = &types.Header{Number: big.NewInt(15)} - d.On("WaitForNewBlocks", mock.Anything, uint64(8)). - After(time.Millisecond * 100). - Return(uint64(19)).Once() - d.On("GetLastFinalizedBlock", mock.Anything).Return(lastFinalizedBlock, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return(EVMBlocks{}, false) - - // iteration 6: last finalized block is now 20, no events, report empty block - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(20)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return(EVMBlocks{}, false) - - d.On("WaitForNewBlocks", mock.Anything, uint64(19)). - After(time.Millisecond * 100). - Return(uint64(20)).Once() - b19 := createEVMBlockFn(&types.Header{Number: big.NewInt(19)}, true) - expectedBlocks = append(expectedBlocks, b19) - d.On("GetBlockHeader", mock.Anything, uint64(19)).Return(b19.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 8: last finalized block is 21, no events - b20 := createEVMBlockFn(&types.Header{Number: big.NewInt(20)}, true) - expectedBlocks = append(expectedBlocks, b20) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(21)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(20)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(20)).Return(b20.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 9: last finalized block is 22, no events - d.On("WaitForNewBlocks", mock.Anything, uint64(20)). - After(time.Millisecond * 100). - Return(uint64(21)).Once() - b21 := createEVMBlockFn(&types.Header{Number: big.NewInt(21)}, true) - expectedBlocks = append(expectedBlocks, b21) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(22)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(21), uint64(21)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(21)).Return(b21.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 10: last finalized block is 23, no events - d.On("WaitForNewBlocks", mock.Anything, uint64(21)). - After(time.Millisecond * 100). - Return(uint64(22)).Once() - b22 := createEVMBlockFn(&types.Header{Number: big.NewInt(22)}, true) - expectedBlocks = append(expectedBlocks, b22) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(23)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(22), uint64(22)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(22)).Return(b22.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 11: last finalized block is still 23, no events - d.On("WaitForNewBlocks", mock.Anything, uint64(22)). - After(time.Millisecond * 100). - Return(uint64(23)).Once() - b23 := createEVMBlockFn(&types.Header{Number: big.NewInt(23)}, true) - expectedBlocks = append(expectedBlocks, b23) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(23)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(23), uint64(23)). - Return(EVMBlocks{}, false) - d.On("GetBlockHeader", mock.Anything, uint64(23)).Return(b23.EVMBlockHeader, false) // reporting empty finalized to block - - // iteration 12: finalized block is 24, has events - d.On("WaitForNewBlocks", mock.Anything, uint64(23)). - After(time.Millisecond * 100). - Return(uint64(24)).Once() - b24 := &EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: 24, - Hash: common.HexToHash("24"), - }, - Events: []interface{}{testEvent(common.HexToHash("24"))}, - } - expectedBlocks = append(expectedBlocks, b24) - d.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(24)}, nil).Once() - d.On("GetEventsByBlockRange", mock.Anything, uint64(24), uint64(24)). - Return(EVMBlocks{b24}, false) - - // iteration 13: closing the downloader - d.On("WaitForNewBlocks", mock.Anything, uint64(24)).Return(uint64(25)).After(time.Millisecond * 100).Once() - - go dwnldr.Download(ctx1, 0, downloadCh) - for _, expectedBlock := range expectedBlocks { - actualBlock := <-downloadCh - log.Debugf("block %d received!", actualBlock.Num) - require.Equal(t, *expectedBlock, actualBlock) - } - log.Debug("canceling") - cancel() - _, ok := <-downloadCh - require.False(t, ok) -} - func TestWaitForNewBlocks(t *testing.T) { ctx := context.Background() d, clientMock := NewTestDownloader(t, time.Millisecond*100) @@ -487,6 +311,81 @@ func TestGetLogs(t *testing.T) { require.Equal(t, []types.Log{}, logs) } +func TestDownloadBeforeFinalized(t *testing.T) { + mockEthDownloader := NewEVMDownloaderMock(t) + + ctx := context.Background() + ctx1, cancel := context.WithCancel(ctx) + defer cancel() + + downloader, _ := NewTestDownloader(t, time.Millisecond) + downloader.EVMDownloaderInterface = mockEthDownloader + + steps := []struct { + finalizedBlock uint64 + fromBlock, toBlock uint64 + eventsReponse EVMBlocks + waitForNewBlocks bool + waitForNewBlocksRequest uint64 + waitForNewBlockReply uint64 + getBlockHeader *EVMBlockHeader + }{ + {finalizedBlock: 33, fromBlock: 1, toBlock: 11, waitForNewBlocks: true, waitForNewBlocksRequest: 0, waitForNewBlockReply: 35, getBlockHeader: &EVMBlockHeader{Num: 11}}, + {finalizedBlock: 33, fromBlock: 12, toBlock: 22, eventsReponse: EVMBlocks{createEVMBlock(t, 14, true)}, getBlockHeader: &EVMBlockHeader{Num: 22}}, + // It returns the last block of range, so it don't need to create a empty one + {finalizedBlock: 33, fromBlock: 23, toBlock: 33, eventsReponse: EVMBlocks{createEVMBlock(t, 33, true)}}, + // It reach the top of chain (block 35) + {finalizedBlock: 33, fromBlock: 34, toBlock: 35}, + // Previous iteration we reach top of chain so we need update the latest block + {finalizedBlock: 33, fromBlock: 34, toBlock: 54, waitForNewBlocks: true, waitForNewBlocksRequest: 35, waitForNewBlockReply: 60}, + // finalized block is 35, so we can reduce emit an emptyBlock and reduce the range + {finalizedBlock: 35, fromBlock: 34, toBlock: 60, getBlockHeader: &EVMBlockHeader{Num: 35}}, + {finalizedBlock: 35, fromBlock: 36, toBlock: 46}, + {finalizedBlock: 35, fromBlock: 36, toBlock: 56, eventsReponse: EVMBlocks{createEVMBlock(t, 36, false)}}, + // Block 36 is the new last block,so it reduce the range again to [37-47] + {finalizedBlock: 35, fromBlock: 37, toBlock: 47}, + {finalizedBlock: 57, fromBlock: 37, toBlock: 57, eventsReponse: EVMBlocks{createEVMBlock(t, 57, false)}}, + {finalizedBlock: 61, fromBlock: 58, toBlock: 60, eventsReponse: EVMBlocks{createEVMBlock(t, 60, false)}}, + {finalizedBlock: 61, fromBlock: 61, toBlock: 61, waitForNewBlocks: true, waitForNewBlocksRequest: 60, waitForNewBlockReply: 61, getBlockHeader: &EVMBlockHeader{Num: 61}}, + {finalizedBlock: 61, fromBlock: 62, toBlock: 62, waitForNewBlocks: true, waitForNewBlocksRequest: 61, waitForNewBlockReply: 62}, + } + for i := 0; i < len(steps); i++ { + log.Info("iteration: ", i, "------------------------------------------------") + downloadCh := make(chan EVMBlock, 100) + downloader, _ := NewTestDownloader(t, time.Millisecond) + downloader.EVMDownloaderInterface = mockEthDownloader + downloader.setStopDownloaderOnIterationN(i + 1) + expectedBlocks := EVMBlocks{} + for _, step := range steps[:i+1] { + mockEthDownloader.On("GetLastFinalizedBlock", mock.Anything).Return(&types.Header{Number: big.NewInt(int64(step.finalizedBlock))}, nil).Once() + if step.waitForNewBlocks { + mockEthDownloader.On("WaitForNewBlocks", mock.Anything, step.waitForNewBlocksRequest).Return(step.waitForNewBlockReply).Once() + } + mockEthDownloader.On("GetEventsByBlockRange", mock.Anything, step.fromBlock, step.toBlock). + Return(step.eventsReponse, false).Once() + for _, eventBlock := range step.eventsReponse { + expectedBlocks = append(expectedBlocks, eventBlock) + } + if step.getBlockHeader != nil { + log.Infof("iteration:%d : GetBlockHeader(%d) ", i, step.getBlockHeader.Num) + mockEthDownloader.On("GetBlockHeader", mock.Anything, step.getBlockHeader.Num).Return(*step.getBlockHeader, false).Once() + expectedBlocks = append(expectedBlocks, &EVMBlock{ + EVMBlockHeader: *step.getBlockHeader, + IsFinalizedBlock: step.getBlockHeader.Num <= step.finalizedBlock, + }) + } + } + downloader.Download(ctx1, 1, downloadCh) + mockEthDownloader.AssertExpectations(t) + for _, expectedBlock := range expectedBlocks { + log.Debugf("waiting block %d ", expectedBlock.Num) + actualBlock := <-downloadCh + log.Debugf("block %d received!", actualBlock.Num) + require.Equal(t, *expectedBlock, actualBlock) + } + } +} + func buildAppender() LogAppenderMap { appender := make(LogAppenderMap) appender[eventSignature] = func(b *EVMBlock, l types.Log) error { @@ -512,3 +411,16 @@ func NewTestDownloader(t *testing.T, retryPeriod time.Duration) (*EVMDownloader, require.NoError(t, err) return d, clientMock } + +func createEVMBlock(t *testing.T, num uint64, isSafeBlock bool) *EVMBlock { + t.Helper() + return &EVMBlock{ + IsFinalizedBlock: isSafeBlock, + EVMBlockHeader: EVMBlockHeader{ + Num: num, + Hash: common.HexToHash(fmt.Sprintf("0x%.2X", num)), + ParentHash: common.HexToHash(fmt.Sprintf("0x%.2X", num-1)), + Timestamp: uint64(time.Now().Unix()), + }, + } +}