From 0b97e6203b7c45b19e272fe7cb00023b3324d8d3 Mon Sep 17 00:00:00 2001 From: Jerry Date: Wed, 31 Jul 2024 20:28:22 -0700 Subject: [PATCH 1/7] Enable a sequencer to re-sequence batches --- cmd/integration/commands/flags.go | 3 +- .../commands/stage_stages_zkevm.go | 8 +- cmd/integration/commands/stages_zkevm.go | 40 ++- cmd/utils/flags.go | 10 + eth/ethconfig/config_zkevm.go | 2 + turbo/cli/default_flags.go | 2 + turbo/cli/flags_zkevm.go | 2 + zk/datastream/client/stream_client.go | 115 +------- zk/datastream/client/stream_client_test.go | 2 +- zk/datastream/server/data_stream_server.go | 73 +++++ zk/datastream/types/utils.go | 119 ++++++++ zk/stages/stage_sequence_execute.go | 126 ++++++++- zk/stages/stage_sequence_execute_state.go | 21 +- zk/stages/stage_sequence_execute_utils.go | 91 ++++++- .../stage_sequence_execute_utils_test.go | 254 ++++++++++++++++++ zk/stages/test_utils.go | 4 + zk/tx/tx.go | 2 + 17 files changed, 745 insertions(+), 129 deletions(-) create mode 100644 zk/datastream/types/utils.go diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index 96e43da4d4a..ecf79ff77b5 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -31,6 +31,7 @@ var ( pruneTBefore, pruneCBefore uint64 experiments []string chain string // Which chain to use (mainnet, rinkeby, goerli, etc.) + config string commitmentMode string commitmentTrie string @@ -49,7 +50,7 @@ func must(err error) { } func withConfig(cmd *cobra.Command) { - cmd.Flags().String("config", "", "yaml/toml config file location") + cmd.Flags().StringVar(&config, "config", "", "yaml/toml config file location") } func withMining(cmd *cobra.Command) { diff --git a/cmd/integration/commands/stage_stages_zkevm.go b/cmd/integration/commands/stage_stages_zkevm.go index d2078b02e44..85b8f72cb47 100644 --- a/cmd/integration/commands/stage_stages_zkevm.go +++ b/cmd/integration/commands/stage_stages_zkevm.go @@ -8,11 +8,8 @@ import ( common2 "github.com/gateway-fm/cdk-erigon-lib/common" "github.com/gateway-fm/cdk-erigon-lib/kv" - "github.com/ledgerwatch/erigon/core" - "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" smtdb "github.com/ledgerwatch/erigon/smt/pkg/db" - erigoncli "github.com/ledgerwatch/erigon/turbo/cli" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" @@ -28,9 +25,6 @@ state_stages_zkevm --datadir=/datadirs/hermez-mainnet --unwind-batch-no=2 --chai Example: "go run ./cmd/integration state_stages_zkevm --config=... --verbosity=3 --unwind-batch-no=100", Run: func(cmd *cobra.Command, args []string) { ctx, _ := common2.RootContext() - ethConfig := ðconfig.Defaults - ethConfig.Genesis = core.GenesisBlockByChainName(chain) - erigoncli.ApplyFlagsForEthConfigCobra(cmd.Flags(), ethConfig) db := openDB(dbCfg(kv.ChainDB, chaindata), true) defer db.Close() @@ -101,6 +95,8 @@ func unwindZk(ctx context.Context, db kv.RwDB) error { return err } + stages.SaveStageProgress(tx, stages.HighestSeenBatchNumber, unwindBatchNo) + if err := tx.Commit(); err != nil { return err } diff --git a/cmd/integration/commands/stages_zkevm.go b/cmd/integration/commands/stages_zkevm.go index 79fbf578f2e..9322fc2e853 100644 --- a/cmd/integration/commands/stages_zkevm.go +++ b/cmd/integration/commands/stages_zkevm.go @@ -2,6 +2,12 @@ package commands import ( "context" + "encoding/json" + "math/big" + "os" + "path" + "path/filepath" + "strings" "github.com/c2h5oh/datasize" chain3 "github.com/gateway-fm/cdk-erigon-lib/chain" @@ -10,11 +16,14 @@ import ( "github.com/gateway-fm/cdk-erigon-lib/kv/kvcfg" "github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb" "github.com/ledgerwatch/erigon/cmd/sentry/sentry" + "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/shards" stages2 "github.com/ledgerwatch/erigon/turbo/stages" "github.com/ledgerwatch/erigon/zk/sequencer" @@ -26,7 +35,36 @@ func newSyncZk(ctx context.Context, db kv.RwDB) (consensus.Engine, *vm.Config, * vmConfig := &vm.Config{} - genesis := core.GenesisBlockByChainName(chain) + var genesis *types.Genesis + + if strings.HasPrefix(chain, "dynamic") { + if config == "" { + panic("Config file is required for dynamic chain") + } + + params.DynamicChainConfigPath = filepath.Dir(config) + genesis = core.GenesisBlockByChainName(chain) + filename := path.Join(params.DynamicChainConfigPath, chain+"-conf.json") + + dConf := utils.DynamicConfig{} + + if _, err := os.Stat(filename); err == nil { + dConfBytes, err := os.ReadFile(filename) + if err != nil { + panic(err) + } + if err := json.Unmarshal(dConfBytes, &dConf); err != nil { + panic(err) + } + } + + genesis.Timestamp = dConf.Timestamp + genesis.GasLimit = dConf.GasLimit + genesis.Difficulty = big.NewInt(dConf.Difficulty) + } else { + genesis = core.GenesisBlockByChainName(chain) + } + chainConfig, genesisBlock, genesisErr := core.CommitGenesisBlock(db, genesis, "") if _, ok := genesisErr.(*chain3.ConfigCompatError); genesisErr != nil && !ok { panic(genesisErr) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ee8869a18ea..b435383c41d 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -491,6 +491,16 @@ var ( Usage: "Halt the sequencer on this batch number", Value: 0, } + SequencerResequence = cli.BoolFlag{ + Name: "zkevm.sequencer-resequence", + Usage: "When enabled, the sequencer will automatically resequence unseen batches stored in data stream", + Value: false, + } + SequencerResequenceStrict = cli.BoolFlag{ + Name: "zkevm.sequencer-resequence-strict", + Usage: "Strictly resequence the rolledback batches", + Value: true, + } ExecutorUrls = cli.StringFlag{ Name: "zkevm.executor-urls", Usage: "A comma separated list of grpc addresses that host executors", diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index c8ed00eecc9..065a69d42ff 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -35,6 +35,8 @@ type Zk struct { SequencerBatchSealTime time.Duration SequencerBatchVerificationTimeout time.Duration SequencerHaltOnBatchNumber uint64 + SequencerResequence bool + SequencerResequenceStrict bool ExecutorUrls []string ExecutorStrictMode bool ExecutorRequestTimeout time.Duration diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index a0962a8369f..5c2460cf72b 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -195,6 +195,8 @@ var DefaultFlags = []cli.Flag{ &utils.SequencerBatchSealTime, &utils.SequencerBatchVerificationTimeout, &utils.SequencerHaltOnBatchNumber, + &utils.SequencerResequence, + &utils.SequencerResequenceStrict, &utils.ExecutorUrls, &utils.ExecutorStrictMode, &utils.ExecutorRequestTimeout, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 7b527b387b5..42956b411c1 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -132,6 +132,8 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { SequencerBatchSealTime: sequencerBatchSealTime, SequencerBatchVerificationTimeout: sequencerBatchVerificationTimeout, SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name), + SequencerResequence: ctx.Bool(utils.SequencerResequence.Name), + SequencerResequenceStrict: ctx.Bool(utils.SequencerResequenceStrict.Name), ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","), ExecutorStrictMode: ctx.Bool(utils.ExecutorStrictMode.Name), ExecutorRequestTimeout: ctx.Duration(utils.ExecutorRequestTimeout.Name), diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 9647c252e82..b9f62a6a89a 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -201,7 +201,7 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu if c.Header.TotalEntries == count { break } - file, err := c.readFileEntry() + file, err := c.NextFileEntry() if err != nil { return fmt.Errorf("error reading file entry: %v", err) } @@ -321,7 +321,7 @@ LOOP: c.conn.SetReadDeadline(time.Now().Add(c.checkTimeout)) } - fullBlock, batchStart, batchEnd, gerUpdate, batchBookmark, blockBookmark, localErr := c.readFullBlockProto() + fullBlock, batchStart, batchEnd, gerUpdate, batchBookmark, blockBookmark, localErr := types.FullBlockProto(c) if localErr != nil { err = localErr break @@ -344,7 +344,7 @@ LOOP: } if batchEnd != nil { - // this check was inside c.readFullBlockProto() but it is better to move it here + // this check was inside types.FullBlockProto(c) but it is better to move it here c.batchEndChan <- *batchEnd } @@ -378,116 +378,9 @@ func (c *StreamClient) tryReConnect() error { return err } -func (c *StreamClient) readFullBlockProto() ( - l2Block *types.FullL2Block, - batchStart *types.BatchStart, - batchEnd *types.BatchEnd, - gerUpdate *types.GerUpdate, - batchBookmark *types.BookmarkProto, - blockBookmark *types.BookmarkProto, - err error, -) { - file, err := c.readFileEntry() - if err != nil { - err = fmt.Errorf("read file entry error: %v", err) - return - } - - switch file.EntryType { - case types.BookmarkEntryType: - var bookmark *types.BookmarkProto - if bookmark, err = types.UnmarshalBookmark(file.Data); err != nil { - return - } - if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH { - batchBookmark = bookmark - return - } else { - blockBookmark = bookmark - return - } - case types.EntryTypeGerUpdate: - if gerUpdate, err = types.DecodeGerUpdateProto(file.Data); err != nil { - return - } - log.Trace("ger update", "ger", gerUpdate) - return - case types.EntryTypeBatchStart: - if batchStart, err = types.UnmarshalBatchStart(file.Data); err != nil { - return - } - return - case types.EntryTypeBatchEnd: - if batchEnd, err = types.UnmarshalBatchEnd(file.Data); err != nil { - return - } - return - case types.EntryTypeL2Block: - if l2Block, err = types.UnmarshalL2Block(file.Data); err != nil { - return - } - - txs := []types.L2TransactionProto{} - - var innerFile *types.FileEntry - var l2Tx *types.L2TransactionProto - LOOP: - for { - if innerFile, err = c.readFileEntry(); err != nil { - return - } - - if innerFile.IsL2Tx() { - if l2Tx, err = types.UnmarshalTx(innerFile.Data); err != nil { - return - } - txs = append(txs, *l2Tx) - } else if innerFile.IsL2BlockEnd() { - var l2BlockEnd *types.L2BlockEndProto - if l2BlockEnd, err = types.UnmarshalL2BlockEnd(innerFile.Data); err != nil { - return - } - if l2BlockEnd.GetBlockNumber() != l2Block.L2BlockNumber { - err = fmt.Errorf("block end number (%d) not equal to block number (%d)", l2BlockEnd.GetBlockNumber(), l2Block.L2BlockNumber) - return - } - break LOOP - } else if innerFile.IsBookmark() { - var bookmark *types.BookmarkProto - if bookmark, err = types.UnmarshalBookmark(innerFile.Data); err != nil || bookmark == nil { - return - } - if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { - break LOOP - } else { - err = fmt.Errorf("unexpected bookmark type inside block: %v", bookmark.Type()) - return - } - } else if innerFile.IsBatchEnd() { - if batchEnd, err = types.UnmarshalBatchEnd(file.Data); err != nil { - return - } - break LOOP - } else { - err = fmt.Errorf("unexpected entry type inside a block: %d", innerFile.EntryType) - return - } - } - - l2Block.L2Txs = txs - return - case types.EntryTypeL2Tx: - err = fmt.Errorf("unexpected l2Tx out of block") - return - default: - err = fmt.Errorf("unexpected entry type: %d", file.EntryType) - return - } -} - // reads file bytes from socket and tries to parse them // returns the parsed FileEntry -func (c *StreamClient) readFileEntry() (file *types.FileEntry, err error) { +func (c *StreamClient) NextFileEntry() (file *types.FileEntry, err error) { // Read packet type packet, err := readBuffer(c.conn, 1) if err != nil { diff --git a/zk/datastream/client/stream_client_test.go b/zk/datastream/client/stream_client_test.go index f1ec21d59c2..7c6b5e14e6b 100644 --- a/zk/datastream/client/stream_client_test.go +++ b/zk/datastream/client/stream_client_test.go @@ -185,7 +185,7 @@ func Test_readFileEntry(t *testing.T) { server.Close() }() - result, err := c.readFileEntry() + result, err := c.NextFileEntry() require.Equal(t, testCase.expectedError, err) assert.DeepEqual(t, testCase.expectedResult, result) }) diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index 0d3fd1129d5..f9933008dee 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -568,3 +568,76 @@ func (srv *DataStreamServer) getLastEntryOfType(entryType datastreamer.EntryType return emtryEntry, false, nil } + +type dataStreamServerIterator struct { + stream *datastreamer.StreamServer + curEntryNum uint64 +} + +func newDataStreamServerIterator(stream *datastreamer.StreamServer, start uint64) *dataStreamServerIterator { + return &dataStreamServerIterator{ + stream: stream, + curEntryNum: start, + } +} + +func (it *dataStreamServerIterator) NextFileEntry() (entry *types.FileEntry, err error) { + var fileEntry datastreamer.FileEntry + fileEntry, err = it.stream.GetEntry(it.curEntryNum) + if err != nil { + return nil, err + } + + it.curEntryNum += 1 + + return &types.FileEntry{ + PacketType: uint8(fileEntry.Type), + Length: fileEntry.Length, + EntryType: types.EntryType(fileEntry.Type), + EntryNum: fileEntry.Number, + Data: fileEntry.Data, + }, nil +} + +func (srv *DataStreamServer) ReadBatches(start uint64, end uint64) ([][]*types.FullL2Block, error) { + bookmark := types.NewBookmarkProto(start, datastream.BookmarkType_BOOKMARK_TYPE_BATCH) + marshalled, err := bookmark.Marshal() + if err != nil { + return nil, err + } + + entryNum, err := srv.stream.GetBookmark(marshalled) + + if err != nil { + return nil, err + } + + iterator := newDataStreamServerIterator(srv.stream, entryNum) + + return ReadBatches(iterator, start, end) +} + +func ReadBatches(iterator types.FileEntryIterator, start uint64, end uint64) ([][]*types.FullL2Block, error) { + batches := make([][]*types.FullL2Block, end-start+1) + + for { + block, batchStart, batchEnd, _, _, _, err := types.FullBlockProto(iterator) + if err != nil { + return nil, err + } + + if batchEnd != nil && batchEnd.Number == end { + break + } + + if batchStart != nil { + batches[batchStart.Number-start] = []*types.FullL2Block{} + } + + if block != nil { + batches[block.BatchNumber-start] = append(batches[block.BatchNumber-start], block) + } + } + + return batches, nil +} diff --git a/zk/datastream/types/utils.go b/zk/datastream/types/utils.go new file mode 100644 index 00000000000..47b9ba9c2f6 --- /dev/null +++ b/zk/datastream/types/utils.go @@ -0,0 +1,119 @@ +package types + +import ( + "fmt" + + "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" + "github.com/ledgerwatch/log/v3" +) + +type FileEntryIterator interface { + NextFileEntry() (*FileEntry, error) +} + +func FullBlockProto(iterator FileEntryIterator) ( + l2Block *FullL2Block, + batchStart *BatchStart, + batchEnd *BatchEnd, + gerUpdate *GerUpdate, + batchBookmark *BookmarkProto, + blockBookmark *BookmarkProto, + err error, +) { + file, err := iterator.NextFileEntry() + if err != nil { + err = fmt.Errorf("read file entry error: %v", err) + return + } + + switch file.EntryType { + case BookmarkEntryType: + var bookmark *BookmarkProto + if bookmark, err = UnmarshalBookmark(file.Data); err != nil { + return + } + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH { + batchBookmark = bookmark + return + } else { + blockBookmark = bookmark + return + } + case EntryTypeGerUpdate: + if gerUpdate, err = DecodeGerUpdateProto(file.Data); err != nil { + return + } + log.Trace("ger update", "ger", gerUpdate) + return + case EntryTypeBatchStart: + if batchStart, err = UnmarshalBatchStart(file.Data); err != nil { + return + } + return + case EntryTypeBatchEnd: + if batchEnd, err = UnmarshalBatchEnd(file.Data); err != nil { + return + } + return + case EntryTypeL2Block: + if l2Block, err = UnmarshalL2Block(file.Data); err != nil { + return + } + + txs := []L2TransactionProto{} + + var innerFile *FileEntry + var l2Tx *L2TransactionProto + LOOP: + for { + if innerFile, err = iterator.NextFileEntry(); err != nil { + return + } + + if innerFile.IsL2Tx() { + if l2Tx, err = UnmarshalTx(innerFile.Data); err != nil { + return + } + txs = append(txs, *l2Tx) + } else if innerFile.IsL2BlockEnd() { + var l2BlockEnd *L2BlockEndProto + if l2BlockEnd, err = UnmarshalL2BlockEnd(innerFile.Data); err != nil { + return + } + if l2BlockEnd.GetBlockNumber() != l2Block.L2BlockNumber { + err = fmt.Errorf("block end number (%d) not equal to block number (%d)", l2BlockEnd.GetBlockNumber(), l2Block.L2BlockNumber) + return + } + break LOOP + } else if innerFile.IsBookmark() { + var bookmark *BookmarkProto + if bookmark, err = UnmarshalBookmark(innerFile.Data); err != nil || bookmark == nil { + return + } + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { + break LOOP + } else { + err = fmt.Errorf("unexpected bookmark type inside block: %v", bookmark.Type()) + return + } + } else if innerFile.IsBatchEnd() { + if batchEnd, err = UnmarshalBatchEnd(file.Data); err != nil { + return + } + break LOOP + } else { + err = fmt.Errorf("unexpected entry type inside a block: %d", innerFile.EntryType) + return + } + } + + l2Block.L2Txs = txs + return + case EntryTypeL2Tx: + err = fmt.Errorf("unexpected l2Tx out of block") + return + default: + err = fmt.Errorf("unexpected entry type: %d", file.EntryType) + return + } +} diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index d9ac3cbabd5..f27cc96627e 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -15,6 +15,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk" + zktx "github.com/ledgerwatch/erigon/zk/tx" "github.com/ledgerwatch/erigon/zk/utils" ) @@ -25,6 +26,72 @@ func SpawnSequencingStage( cfg SequenceBlockCfg, historyCfg stagedsync.HistoryCfg, quiet bool, +) (err error) { + roTx, err := cfg.db.BeginRo(ctx) + if err != nil { + return err + } + defer roTx.Rollback() + + lastBatch, err := stages.GetStageProgress(roTx, stages.HighestSeenBatchNumber) + if err != nil { + return err + } + + highestBatchInDS, err := cfg.datastreamServer.GetHighestBatchNumber() + if err != nil { + return err + } + + if !cfg.zk.SequencerResequence || lastBatch >= highestBatchInDS { + err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, nil) + if err != nil { + return err + } + } else { + log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDS)) + + batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDS) + if err != nil { + return err + } + + err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1) + if err != nil { + return err + } + + log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDS)) + + for _, batch := range batches { + batchJob := NewResequenceBatchJob(batch) + subBatchCount := 0 + for batchJob.HasMoreBlockToProcess() { + if err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, batchJob); err != nil { + return err + } + + subBatchCount += 1 + } + + log.Info(fmt.Sprintf("[%s] Resequenced original batch %d with %d batches", s.LogPrefix(), batchJob.batchToProcess[0].BatchNumber, subBatchCount)) + if cfg.zk.SequencerResequenceStrict && subBatchCount != 1 { + return fmt.Errorf("strict mode enabled, but resequenced batch %d has %d sub-batches", batchJob.batchToProcess[0].BatchNumber, subBatchCount) + } + } + } + + return nil +} + +func sequencingStageStep( + s *stagedsync.StageState, + u stagedsync.Unwinder, + ctx context.Context, + cfg SequenceBlockCfg, + historyCfg stagedsync.HistoryCfg, + quiet bool, + resequenceBatchJob *ResequenceBatchJob, ) (err error) { logPrefix := s.LogPrefix() log.Info(fmt.Sprintf("[%s] Starting sequencing stage", logPrefix)) @@ -59,7 +126,7 @@ func SpawnSequencingStage( var block *types.Block runLoopBlocks := true batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb) - batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) + batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool, resequenceBatchJob) blockDataSizeChecker := newBlockDataChecker() streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake @@ -139,6 +206,13 @@ func SpawnSequencingStage( } } + if batchState.isResequence() { + if !batchState.resequenceBatchJob.HasMoreBlockToProcess() { + runLoopBlocks = false + break + } + } + l1InfoIndex, err := sdb.hermezDb.GetBlockL1InfoTreeIndex(blockNumber - 1) if err != nil { return err @@ -161,11 +235,11 @@ func SpawnSequencingStage( if err != nil { return err } - if !batchState.isAnyRecovery() && overflowOnNewBlock { + if (!batchState.isAnyRecovery() || batchState.isResequence()) && overflowOnNewBlock { break } - infoTreeIndexProgress, l1TreeUpdate, l1TreeUpdateIndex, l1BlockHash, ger, shouldWriteGerToContract, err := prepareL1AndInfoTreeRelatedStuff(sdb, batchState, header.Time) + infoTreeIndexProgress, l1TreeUpdate, l1TreeUpdateIndex, l1BlockHash, ger, shouldWriteGerToContract, err := prepareL1AndInfoTreeRelatedStuff(sdb, batchState, header.Time, cfg.zk.SequencerResequenceStrict) if err != nil { return err } @@ -207,11 +281,16 @@ func SpawnSequencingStage( if err != nil { return err } - } else if !batchState.isL1Recovery() { + } else if !batchState.isL1Recovery() && !batchState.isResequence() { batchState.blockState.transactionsForInclusion, err = getNextPoolTransactions(ctx, cfg, executionAt, batchState.forkId, batchState.yieldedTransactions) if err != nil { return err } + } else if batchState.isResequence() { + batchState.blockState.transactionsForInclusion, err = batchState.resequenceBatchJob.YieldNextBlockTransactions(zktx.DecodeTx) + if err != nil { + return err + } } if len(batchState.blockState.transactionsForInclusion) == 0 { @@ -232,6 +311,18 @@ func SpawnSequencingStage( panic("limbo transaction has already been executed once so they must not fail while re-executing") } + if batchState.isResequence() { + if cfg.zk.SequencerResequenceStrict { + return fmt.Errorf("strict mode enabled, but resequenced batch %d failed to add transaction %s: %v", batchState.batchNumber, txHash, err) + } else { + log.Warn(fmt.Sprintf("[%s] error adding transaction to batch during resequence: %v", logPrefix, err), + "hash", txHash, + "to", transaction.GetTo(), + ) + continue + } + } + // if we are in recovery just log the error as a warning. If the data is on the L1 then we should consider it as confirmed. // The executor/prover would simply skip a TX with an invalid nonce for example so we don't need to worry about that here. if batchState.isL1Recovery() { @@ -273,12 +364,39 @@ func SpawnSequencingStage( break LOOP_TRANSACTIONS } + if batchState.isResequence() && cfg.zk.SequencerResequenceStrict { + return fmt.Errorf("strict mode enabled, but resequenced batch %d overflowed counters on block %d", batchState.batchNumber, blockNumber) + } + + break LOOP_TRANSACTIONS } if err == nil { blockDataSizeChecker = &backupDataSizeChecker batchState.onAddedTransaction(transaction, receipt, execResult, effectiveGas) } + + // We will only update the processed index in resequence job if there isn't overflow + if batchState.isResequence() { + batchState.resequenceBatchJob.UpdateLastProcessedTx(txHash) + } + } + + if batchState.isResequence() { + if len(batchState.blockState.transactionsForInclusion) == 0 { + // We need to jump to the next block here if there are no transactions in current block + batchState.resequenceBatchJob.UpdateLastProcessedTx(batchState.resequenceBatchJob.CurrentBlock().L2Blockhash) + break LOOP_TRANSACTIONS + } + + if batchState.resequenceBatchJob.AtNewBlockBoundary() { + // We need to jump to the next block here if we are at the end of the current block + break LOOP_TRANSACTIONS + } else { + if cfg.zk.SequencerResequenceStrict { + return fmt.Errorf("strict mode enabled, but resequenced batch %d has transactions that overflowed counters or failed transactions", batchState.batchNumber) + } + } } if batchState.isL1Recovery() { diff --git a/zk/stages/stage_sequence_execute_state.go b/zk/stages/stage_sequence_execute_state.go index 20245564fdd..66bc835eb7e 100644 --- a/zk/stages/stage_sequence_execute_state.go +++ b/zk/stages/stage_sequence_execute_state.go @@ -44,9 +44,17 @@ type BatchState struct { blockState *BlockState batchL1RecoveryData *BatchL1RecoveryData limboRecoveryData *LimboRecoveryData + resequenceBatchJob *ResequenceBatchJob } -func newBatchState(forkId, batchNumber uint64, hasExecutorForThisBatch, l1Recovery bool, txPool *txpool.TxPool) *BatchState { +func newBatchState( + forkId, + batchNumber uint64, + hasExecutorForThisBatch, + l1Recovery bool, + txPool *txpool.TxPool, + resequenceBatchJob *ResequenceBatchJob, +) *BatchState { batchState := &BatchState{ forkId: forkId, batchNumber: batchNumber, @@ -57,6 +65,7 @@ func newBatchState(forkId, batchNumber uint64, hasExecutorForThisBatch, l1Recove blockState: newBlockState(), batchL1RecoveryData: nil, limboRecoveryData: nil, + resequenceBatchJob: resequenceBatchJob, } if l1Recovery { @@ -79,8 +88,12 @@ func (bs *BatchState) isLimboRecovery() bool { return bs.limboRecoveryData != nil } +func (bs *BatchState) isResequence() bool { + return bs.resequenceBatchJob != nil +} + func (bs *BatchState) isAnyRecovery() bool { - return bs.isL1Recovery() || bs.isLimboRecovery() + return bs.isL1Recovery() || bs.isLimboRecovery() || bs.isResequence() } func (bs *BatchState) isThereAnyTransactionsToRecover() bool { @@ -103,6 +116,10 @@ func (bs *BatchState) getBlockHeaderForcedTimestamp() uint64 { return bs.limboRecoveryData.limboHeaderTimestamp } + if bs.isResequence() { + return uint64(bs.resequenceBatchJob.CurrentBlock().Timestamp) + } + return math.MaxUint64 } diff --git a/zk/stages/stage_sequence_execute_utils.go b/zk/stages/stage_sequence_execute_utils.go index 40b9697ff3f..3501de0e5a5 100644 --- a/zk/stages/stage_sequence_execute_utils.go +++ b/zk/stages/stage_sequence_execute_utils.go @@ -29,6 +29,7 @@ import ( "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/ledgerwatch/erigon/zk/datastream/server" + dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types" "github.com/ledgerwatch/erigon/zk/hermez_db" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/erigon/zk/tx" @@ -239,7 +240,12 @@ func prepareHeader(tx kv.RwTx, previousBlockNumber, deltaTimestamp, forcedTimest }, parentBlock, nil } -func prepareL1AndInfoTreeRelatedStuff(sdb *stageDb, batchState *BatchState, proposedTimestamp uint64) ( +func prepareL1AndInfoTreeRelatedStuff( + sdb *stageDb, + batchState *BatchState, + proposedTimestamp uint64, + resequenceStrict bool, +) ( infoTreeIndexProgress uint64, l1TreeUpdate *zktypes.L1InfoTreeUpdate, l1TreeUpdateIndex uint64, @@ -257,8 +263,12 @@ func prepareL1AndInfoTreeRelatedStuff(sdb *stageDb, batchState *BatchState, prop return } - if batchState.isL1Recovery() { - l1TreeUpdateIndex = uint64(batchState.blockState.blockL1RecoveryData.L1InfoTreeIndex) + if batchState.isL1Recovery() || (batchState.isResequence() && resequenceStrict) { + if batchState.isL1Recovery() { + l1TreeUpdateIndex = uint64(batchState.blockState.blockL1RecoveryData.L1InfoTreeIndex) + } else { + l1TreeUpdateIndex = uint64(batchState.resequenceBatchJob.CurrentBlock().L1InfoTreeIndex) + } if l1TreeUpdate, err = sdb.hermezDb.GetL1InfoTreeUpdate(l1TreeUpdateIndex); err != nil { return } @@ -443,3 +453,78 @@ func (bdc *BlockDataChecker) AddTransactionData(txL2Data []byte) bool { return false } + +type txMatadata struct { + blockNum int + txIndex int +} + +type ResequenceBatchJob struct { + batchToProcess []*dsTypes.FullL2Block + StartBlockIndex int + StartTxIndex int + txIndexMap map[common.Hash]txMatadata +} + +func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob { + return &ResequenceBatchJob{ + batchToProcess: batch, + StartBlockIndex: 0, + StartTxIndex: 0, + txIndexMap: make(map[common.Hash]txMatadata), + } +} + +func (r *ResequenceBatchJob) HasMoreBlockToProcess() bool { + return r.StartBlockIndex < len(r.batchToProcess) +} + +func (r *ResequenceBatchJob) AtNewBlockBoundary() bool { + return r.StartTxIndex == 0 +} + +func (r *ResequenceBatchJob) CurrentBlock() *dsTypes.FullL2Block { + if r.HasMoreBlockToProcess() { + return r.batchToProcess[r.StartBlockIndex] + } + return nil +} + +func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) ([]types.Transaction, error) { + blockTransactions := make([]types.Transaction, 0) + if r.HasMoreBlockToProcess() { + block := r.CurrentBlock() + r.txIndexMap[block.L2Blockhash] = txMatadata{r.StartBlockIndex, 0} + + for i := r.StartTxIndex; i < len(block.L2Txs); i++ { + transaction := block.L2Txs[i] + tx, _, err := decoder(transaction.Encoded, transaction.EffectiveGasPricePercentage, block.ForkId) + if err != nil { + return nil, fmt.Errorf("decode tx error: %v", err) + } + r.txIndexMap[tx.Hash()] = txMatadata{r.StartBlockIndex, i} + blockTransactions = append(blockTransactions, tx) + } + } + + return blockTransactions, nil +} + +func (r *ResequenceBatchJob) UpdateLastProcessedTx(h common.Hash) { + if idx, ok := r.txIndexMap[h]; ok { + block := r.batchToProcess[idx.blockNum] + + if idx.txIndex >= len(block.L2Txs)-1 { + // we've processed all the transactions in this block + // move to the next block + r.StartBlockIndex = idx.blockNum + 1 + r.StartTxIndex = 0 + } else { + // move to the next transaction in the block + r.StartBlockIndex = idx.blockNum + r.StartTxIndex = idx.txIndex + 1 + } + } else { + log.Warn("tx hash not found in tx index map", "hash", h) + } +} diff --git a/zk/stages/stage_sequence_execute_utils_test.go b/zk/stages/stage_sequence_execute_utils_test.go index 3ff72032840..f5fe9d0eb50 100644 --- a/zk/stages/stage_sequence_execute_utils_test.go +++ b/zk/stages/stage_sequence_execute_utils_test.go @@ -1,8 +1,13 @@ package stages import ( + "reflect" "testing" + "github.com/gateway-fm/cdk-erigon-lib/common" + "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon/core/types" + dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types" zktx "github.com/ledgerwatch/erigon/zk/tx" zktypes "github.com/ledgerwatch/erigon/zk/types" ) @@ -207,3 +212,252 @@ func Test_PrepareForkId_DuringRecovery(t *testing.T) { }) } } + +// Mock implementation of zktx.DecodeTx for testing purposes +func mockDecodeTx(encoded []byte, effectiveGasPricePercentage byte, forkId uint64) (types.Transaction, uint8, error) { + return types.NewTransaction(0, common.Address{}, uint256.NewInt(0), 0, uint256.NewInt(0), encoded), 0, nil +} + +func TestResequenceBatchJob_HasMoreToProcess(t *testing.T) { + tests := []struct { + name string + job ResequenceBatchJob + expected bool + }{ + { + name: "Has more blocks to process", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{}, {}}, + StartBlockIndex: 1, + StartTxIndex: 0, + }, + expected: true, + }, + { + name: "Has more transactions to process", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{L2Txs: []dsTypes.L2TransactionProto{{}, {}}}}, + StartBlockIndex: 0, + StartTxIndex: 0, + }, + expected: true, + }, + { + name: "No more to process", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{}}, + StartBlockIndex: 1, + StartTxIndex: 0, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.job.HasMoreBlockToProcess(); got != tt.expected { + t.Errorf("ResequenceBatchJob.HasMoreBlockToProcess() = %v, want %v", got, tt.expected) + } + }) + } +} + +func TestResequenceBatchJob_CurrentBlock(t *testing.T) { + tests := []struct { + name string + job ResequenceBatchJob + expected *dsTypes.FullL2Block + }{ + { + name: "Has current block", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{L2BlockNumber: 1}, {L2BlockNumber: 2}}, + StartBlockIndex: 0, + StartTxIndex: 0, + }, + expected: &dsTypes.FullL2Block{L2BlockNumber: 1}, + }, + { + name: "No current block", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{L2BlockNumber: 1}}, + StartBlockIndex: 1, + StartTxIndex: 0, + }, + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.job.CurrentBlock() + if (got == nil && tt.expected != nil) || (got != nil && tt.expected == nil) { + t.Errorf("ResequenceBatchJob.CurrentBlock() = %v, want %v", got, tt.expected) + } + if got != nil && tt.expected != nil && got.L2BlockNumber != tt.expected.L2BlockNumber { + t.Errorf("ResequenceBatchJob.CurrentBlock().L2BlockNumber = %v, want %v", got.L2BlockNumber, tt.expected.L2BlockNumber) + } + }) + } +} + +func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) { + // Replace the actual zktx.DecodeTx with our mock function for testing + + tests := []struct { + name string + job ResequenceBatchJob + expectedTxCount int + expectedError bool + }{ + { + name: "Yield transactions", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{ + { + L2Txs: []dsTypes.L2TransactionProto{{}, {}}, + ForkId: 1, + }, + }, + StartBlockIndex: 0, + StartTxIndex: 0, + txIndexMap: make(map[common.Hash]txMatadata), + }, + expectedTxCount: 2, + expectedError: false, + }, + { + name: "No transactions to yield", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{}}, + StartBlockIndex: 1, + StartTxIndex: 0, + txIndexMap: make(map[common.Hash]txMatadata), + }, + expectedTxCount: 0, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + txs, err := tt.job.YieldNextBlockTransactions(mockDecodeTx) + if (err != nil) != tt.expectedError { + t.Errorf("ResequenceBatchJob.YieldNextBlockTransactions() error = %v, expectedError %v", err, tt.expectedError) + return + } + if len(txs) != tt.expectedTxCount { + t.Errorf("ResequenceBatchJob.YieldNextBlockTransactions() returned %d transactions, expected %d", len(txs), tt.expectedTxCount) + } + }) + } +} + +func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) { + // Setup the batch + batch := []*dsTypes.FullL2Block{ + {L2Txs: []dsTypes.L2TransactionProto{{Encoded: []byte("1")}, {Encoded: []byte("2")}}, L2Blockhash: common.HexToHash("0")}, + {L2Txs: []dsTypes.L2TransactionProto{}, L2Blockhash: common.HexToHash("1")}, + {L2Txs: []dsTypes.L2TransactionProto{}, L2Blockhash: common.HexToHash("2")}, + {L2Txs: []dsTypes.L2TransactionProto{{Encoded: []byte("3")}, {Encoded: []byte("4")}}, L2Blockhash: common.HexToHash("3")}, + } + + job := ResequenceBatchJob{ + batchToProcess: batch, + StartBlockIndex: 0, + StartTxIndex: 1, // Start at block 0, index 1 + txIndexMap: make(map[common.Hash]txMatadata), + } + + processTransactions := func(txs []types.Transaction) { + for _, tx := range txs { + job.UpdateLastProcessedTx(tx.Hash()) + } + } + + // First call - should yield transaction 2 from block 0 + txs, err := job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("First call: Unexpected error: %v", err) + } + if len(txs) != 1 || string(txs[0].GetData()) != "2" { + t.Errorf("Expected 1 transaction with data '2', got %d transactions with data '%s'", len(txs), string(txs[0].GetData())) + } + processTransactions(txs) + tx2 := txs[0] + + // Second call - should yield empty block (block 1) + txs, err = job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("Second call: Unexpected error: %v", err) + } + if len(txs) != 0 { + t.Errorf("Expected 0 transactions, got %d", len(txs)) + } + job.UpdateLastProcessedTx(job.CurrentBlock().L2Blockhash) + + // Third call - should yield empty block (block 2) + txs, err = job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("Third call: Unexpected error: %v", err) + } + if len(txs) != 0 { + t.Errorf("Expected 0 transactions, got %d", len(txs)) + } + job.UpdateLastProcessedTx(job.CurrentBlock().L2Blockhash) + + // Fourth call - should yield transactions 3 and 4, but we'll only process 3 + txs, err = job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("Fourth call: Unexpected error: %v", err) + } + if len(txs) != 2 || string(txs[0].GetData()) != "3" || string(txs[1].GetData()) != "4" { + t.Errorf("Expected 2 transactions with data '3' and '4', got %d transactions", len(txs)) + } + processTransactions(txs[:1]) // Only process the first transaction (3) + tx3 := txs[0] + tx4 := txs[1] + + // Check final state + if job.StartBlockIndex != 3 { + t.Errorf("Expected StartBlockIndex to be 3, got %d", job.StartBlockIndex) + } + + if job.StartTxIndex != 1 { + t.Errorf("Expected StartTxIndex to be 1, got %d", job.StartTxIndex) + } + + // Final call - should yield transaction 4 + txs, err = job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("Final call: Unexpected error: %v", err) + } + if len(txs) != 1 || string(txs[0].GetData()) != "4" { + t.Errorf("Expected 1 transaction with data '4', got %d transactions", len(txs)) + } + + processTransactions(txs) + + if job.HasMoreBlockToProcess() { + t.Errorf("Expected no more blocks to process") + } + + // Verify txIndexMap + expectedTxIndexMap := map[common.Hash]txMatadata{ + common.HexToHash("0"): {0, 0}, + common.HexToHash("1"): {1, 0}, + common.HexToHash("2"): {2, 0}, + common.HexToHash("3"): {3, 0}, + tx2.Hash(): {0, 1}, // Transaction 2 + tx3.Hash(): {3, 0}, // Transaction 3 + tx4.Hash(): {3, 1}, // Transaction 4 + } + + for hash, index := range expectedTxIndexMap { + if actualIndex, exists := job.txIndexMap[hash]; !exists { + t.Errorf("Expected hash %s to exist in txIndexMap", hash.Hex()) + } else if !reflect.DeepEqual(actualIndex, index) { + t.Errorf("For hash %s, expected index %v, got %v", hash.Hex(), index, actualIndex) + } + } +} diff --git a/zk/stages/test_utils.go b/zk/stages/test_utils.go index 276a89ea47b..3f04b98c39b 100644 --- a/zk/stages/test_utils.go +++ b/zk/stages/test_utils.go @@ -83,3 +83,7 @@ func (c *TestDatastreamClient) GetStreamingAtomic() *atomic.Bool { func (c *TestDatastreamClient) GetProgressAtomic() *atomic.Uint64 { return &c.progress } + +func (c *TestDatastreamClient) ReadBatches(start uint64, end uint64) ([][]*types.FullL2Block, error) { + return nil, nil +} diff --git a/zk/tx/tx.go b/zk/tx/tx.go index 9b0dfe072ba..afa1769092f 100644 --- a/zk/tx/tx.go +++ b/zk/tx/tx.go @@ -189,6 +189,8 @@ func DecodeBatchL2Blocks(txsData []byte, forkID uint64) ([]DecodedBatchL2Data, e return result, nil } +type TxDecoder func(encodedTx []byte, gasPricePercentage uint8, forkID uint64) (types.Transaction, uint8, error) + func DecodeTx(encodedTx []byte, efficiencyPercentage byte, forkId uint64) (types.Transaction, uint8, error) { // efficiencyPercentage := uint8(0) if forkId >= uint64(constants.ForkID5Dragonfruit) { From e2a6df556237987145f9edeb5848d551f1811aea Mon Sep 17 00:00:00 2001 From: Jerry Date: Thu, 8 Aug 2024 12:41:25 -0700 Subject: [PATCH 2/7] Stop producing blocks after resequencing completes --- zk/stages/stage_sequence_execute.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index f27cc96627e..1e7943312a9 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -44,6 +44,12 @@ func SpawnSequencingStage( } if !cfg.zk.SequencerResequence || lastBatch >= highestBatchInDS { + if cfg.zk.SequencerResequence { + log.Info(fmt.Sprintf("[%s] Resequencing completed. Please restart sequencer without resequence flag.", s.LogPrefix())) + time.Sleep(10 * time.Second) + return nil + } + err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, nil) if err != nil { return err From 1a110cfff53badb5137e737dc96d563b6f92dd1a Mon Sep 17 00:00:00 2001 From: Jerry Date: Thu, 8 Aug 2024 16:02:49 -0700 Subject: [PATCH 3/7] Introduce flag reuseL1InfoIndex --- cmd/utils/flags.go | 5 +++++ eth/ethconfig/config_zkevm.go | 1 + turbo/cli/default_flags.go | 1 + turbo/cli/flags_zkevm.go | 1 + zk/stages/stage_sequence_execute.go | 2 +- zk/stages/stage_sequence_execute_utils.go | 15 ++++++++++++--- 6 files changed, 21 insertions(+), 4 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b435383c41d..dc4a455c351 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -501,6 +501,11 @@ var ( Usage: "Strictly resequence the rolledback batches", Value: true, } + SequencerResequenceReuseL1InfoIndex = cli.BoolFlag{ + Name: "zkevm.sequencer-resequence-reuse-l1-info-index", + Usage: "Reuse the L1 info index for resequencing", + Value: true, + } ExecutorUrls = cli.StringFlag{ Name: "zkevm.executor-urls", Usage: "A comma separated list of grpc addresses that host executors", diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index 065a69d42ff..041e677dfbc 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -37,6 +37,7 @@ type Zk struct { SequencerHaltOnBatchNumber uint64 SequencerResequence bool SequencerResequenceStrict bool + SequencerResequenceReuseL1InfoIndex bool ExecutorUrls []string ExecutorStrictMode bool ExecutorRequestTimeout time.Duration diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 5c2460cf72b..fa9b55babbb 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -197,6 +197,7 @@ var DefaultFlags = []cli.Flag{ &utils.SequencerHaltOnBatchNumber, &utils.SequencerResequence, &utils.SequencerResequenceStrict, + &utils.SequencerResequenceReuseL1InfoIndex, &utils.ExecutorUrls, &utils.ExecutorStrictMode, &utils.ExecutorRequestTimeout, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 42956b411c1..5ee9623d25e 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -134,6 +134,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name), SequencerResequence: ctx.Bool(utils.SequencerResequence.Name), SequencerResequenceStrict: ctx.Bool(utils.SequencerResequenceStrict.Name), + SequencerResequenceReuseL1InfoIndex: ctx.Bool(utils.SequencerResequenceReuseL1InfoIndex.Name), ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","), ExecutorStrictMode: ctx.Bool(utils.ExecutorStrictMode.Name), ExecutorRequestTimeout: ctx.Duration(utils.ExecutorRequestTimeout.Name), diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 1e7943312a9..194b5cc6284 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -245,7 +245,7 @@ func sequencingStageStep( break } - infoTreeIndexProgress, l1TreeUpdate, l1TreeUpdateIndex, l1BlockHash, ger, shouldWriteGerToContract, err := prepareL1AndInfoTreeRelatedStuff(sdb, batchState, header.Time, cfg.zk.SequencerResequenceStrict) + infoTreeIndexProgress, l1TreeUpdate, l1TreeUpdateIndex, l1BlockHash, ger, shouldWriteGerToContract, err := prepareL1AndInfoTreeRelatedStuff(sdb, batchState, header.Time, cfg.zk.SequencerResequenceReuseL1InfoIndex) if err != nil { return err } diff --git a/zk/stages/stage_sequence_execute_utils.go b/zk/stages/stage_sequence_execute_utils.go index 3501de0e5a5..203a55ae884 100644 --- a/zk/stages/stage_sequence_execute_utils.go +++ b/zk/stages/stage_sequence_execute_utils.go @@ -244,7 +244,7 @@ func prepareL1AndInfoTreeRelatedStuff( sdb *stageDb, batchState *BatchState, proposedTimestamp uint64, - resequenceStrict bool, + reuseL1InfoIndex bool, ) ( infoTreeIndexProgress uint64, l1TreeUpdate *zktypes.L1InfoTreeUpdate, @@ -263,11 +263,16 @@ func prepareL1AndInfoTreeRelatedStuff( return } - if batchState.isL1Recovery() || (batchState.isResequence() && resequenceStrict) { + if batchState.isL1Recovery() || (batchState.isResequence() && reuseL1InfoIndex) { if batchState.isL1Recovery() { l1TreeUpdateIndex = uint64(batchState.blockState.blockL1RecoveryData.L1InfoTreeIndex) } else { - l1TreeUpdateIndex = uint64(batchState.resequenceBatchJob.CurrentBlock().L1InfoTreeIndex) + // Resequence mode: + // If we are resequencing at the beginning (AtNewBlockBoundary->true) of a rolledback block, we need to reuse the l1TreeUpdateIndex from the block. + // If we are in the middle of a block (AtNewBlockBoundary -> false), it means the original block will be requenced into multiple blocks, so we will leave l1TreeUpdateIndex as 0 for the rest of blocks. + if batchState.resequenceBatchJob.AtNewBlockBoundary() { + l1TreeUpdateIndex = uint64(batchState.resequenceBatchJob.CurrentBlock().L1InfoTreeIndex) + } } if l1TreeUpdate, err = sdb.hermezDb.GetL1InfoTreeUpdate(l1TreeUpdateIndex); err != nil { return @@ -290,6 +295,10 @@ func prepareL1AndInfoTreeRelatedStuff( ger = l1TreeUpdate.GER } + if batchState.isResequence() && l1TreeUpdateIndex == 0 { + shouldWriteGerToContract = false + } + return } From 234144c8b81f66aac5d4c8f84d3ee71992ccd23e Mon Sep 17 00:00:00 2001 From: Jerry Date: Tue, 27 Aug 2024 11:29:27 -0700 Subject: [PATCH 4/7] Move ReadParsedProto back to data stream client in order to reduce merge conflict --- zk/datastream/client/stream_client.go | 89 +++++++++++++++++++- zk/datastream/server/data_stream_server.go | 5 +- zk/datastream/types/utils.go | 94 ---------------------- 3 files changed, 91 insertions(+), 97 deletions(-) delete mode 100644 zk/datastream/types/utils.go diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 8b0993e0bd9..98cc50109bf 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -304,7 +304,7 @@ LOOP: c.conn.SetReadDeadline(time.Now().Add(c.checkTimeout)) } - parsedProto, localErr := types.ReadParsedProto(c) + parsedProto, localErr := ReadParsedProto(c) if localErr != nil { err = localErr break @@ -353,6 +353,93 @@ func (c *StreamClient) tryReConnect() error { return err } +type FileEntryIterator interface { + NextFileEntry() (*types.FileEntry, error) +} + +func ReadParsedProto(iterator FileEntryIterator) ( + parsedEntry interface{}, + err error, +) { + file, err := iterator.NextFileEntry() + if err != nil { + err = fmt.Errorf("read file entry error: %v", err) + return + } + + switch file.EntryType { + case types.BookmarkEntryType: + parsedEntry, err = types.UnmarshalBookmark(file.Data) + case types.EntryTypeGerUpdate: + parsedEntry, err = types.DecodeGerUpdateProto(file.Data) + case types.EntryTypeBatchStart: + parsedEntry, err = types.UnmarshalBatchStart(file.Data) + case types.EntryTypeBatchEnd: + parsedEntry, err = types.UnmarshalBatchEnd(file.Data) + case types.EntryTypeL2Block: + var l2Block *types.FullL2Block + if l2Block, err = types.UnmarshalL2Block(file.Data); err != nil { + return + } + + txs := []types.L2TransactionProto{} + + var innerFile *types.FileEntry + var l2Tx *types.L2TransactionProto + LOOP: + for { + if innerFile, err = iterator.NextFileEntry(); err != nil { + return + } + + if innerFile.IsL2Tx() { + if l2Tx, err = types.UnmarshalTx(innerFile.Data); err != nil { + return + } + txs = append(txs, *l2Tx) + } else if innerFile.IsL2BlockEnd() { + var l2BlockEnd *types.L2BlockEndProto + if l2BlockEnd, err = types.UnmarshalL2BlockEnd(innerFile.Data); err != nil { + return + } + if l2BlockEnd.GetBlockNumber() != l2Block.L2BlockNumber { + err = fmt.Errorf("block end number (%d) not equal to block number (%d)", l2BlockEnd.GetBlockNumber(), l2Block.L2BlockNumber) + return + } + break LOOP + } else if innerFile.IsBookmark() { + var bookmark *types.BookmarkProto + if bookmark, err = types.UnmarshalBookmark(innerFile.Data); err != nil || bookmark == nil { + return + } + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { + break LOOP + } else { + err = fmt.Errorf("unexpected bookmark type inside block: %v", bookmark.Type()) + return + } + } else if innerFile.IsBatchEnd() { + if _, err = types.UnmarshalBatchEnd(file.Data); err != nil { + return + } + break LOOP + } else { + err = fmt.Errorf("unexpected entry type inside a block: %d", innerFile.EntryType) + return + } + } + + l2Block.L2Txs = txs + parsedEntry = l2Block + return + case types.EntryTypeL2Tx: + err = fmt.Errorf("unexpected l2Tx out of block") + default: + err = fmt.Errorf("unexpected entry type: %d", file.EntryType) + } + return +} + // reads file bytes from socket and tries to parse them // returns the parsed FileEntry func (c *StreamClient) NextFileEntry() (file *types.FileEntry, err error) { diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index 6af9ef0a496..b0f0419b63b 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -11,6 +11,7 @@ import ( "github.com/gateway-fm/cdk-erigon-lib/kv" "github.com/ledgerwatch/erigon/core/rawdb" eritypes "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/zk/datastream/client" "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" "github.com/ledgerwatch/erigon/zk/datastream/types" ) @@ -617,12 +618,12 @@ func (srv *DataStreamServer) ReadBatches(start uint64, end uint64) ([][]*types.F return ReadBatches(iterator, start, end) } -func ReadBatches(iterator types.FileEntryIterator, start uint64, end uint64) ([][]*types.FullL2Block, error) { +func ReadBatches(iterator client.FileEntryIterator, start uint64, end uint64) ([][]*types.FullL2Block, error) { batches := make([][]*types.FullL2Block, end-start+1) LOOP_ENTRIES: for { - parsedProto, err := types.ReadParsedProto(iterator) + parsedProto, err := client.ReadParsedProto(iterator) if err != nil { return nil, err } diff --git a/zk/datastream/types/utils.go b/zk/datastream/types/utils.go deleted file mode 100644 index 75b86452513..00000000000 --- a/zk/datastream/types/utils.go +++ /dev/null @@ -1,94 +0,0 @@ -package types - -import ( - "fmt" - - "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" -) - -type FileEntryIterator interface { - NextFileEntry() (*FileEntry, error) -} - -func ReadParsedProto(iterator FileEntryIterator) ( - parsedEntry interface{}, - err error, -) { - file, err := iterator.NextFileEntry() - if err != nil { - err = fmt.Errorf("read file entry error: %v", err) - return - } - - switch file.EntryType { - case BookmarkEntryType: - parsedEntry, err = UnmarshalBookmark(file.Data) - case EntryTypeGerUpdate: - parsedEntry, err = DecodeGerUpdateProto(file.Data) - case EntryTypeBatchStart: - parsedEntry, err = UnmarshalBatchStart(file.Data) - case EntryTypeBatchEnd: - parsedEntry, err = UnmarshalBatchEnd(file.Data) - case EntryTypeL2Block: - var l2Block *FullL2Block - if l2Block, err = UnmarshalL2Block(file.Data); err != nil { - return - } - - txs := []L2TransactionProto{} - - var innerFile *FileEntry - var l2Tx *L2TransactionProto - LOOP: - for { - if innerFile, err = iterator.NextFileEntry(); err != nil { - return - } - - if innerFile.IsL2Tx() { - if l2Tx, err = UnmarshalTx(innerFile.Data); err != nil { - return - } - txs = append(txs, *l2Tx) - } else if innerFile.IsL2BlockEnd() { - var l2BlockEnd *L2BlockEndProto - if l2BlockEnd, err = UnmarshalL2BlockEnd(innerFile.Data); err != nil { - return - } - if l2BlockEnd.GetBlockNumber() != l2Block.L2BlockNumber { - err = fmt.Errorf("block end number (%d) not equal to block number (%d)", l2BlockEnd.GetBlockNumber(), l2Block.L2BlockNumber) - return - } - break LOOP - } else if innerFile.IsBookmark() { - var bookmark *BookmarkProto - if bookmark, err = UnmarshalBookmark(innerFile.Data); err != nil || bookmark == nil { - return - } - if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { - break LOOP - } else { - err = fmt.Errorf("unexpected bookmark type inside block: %v", bookmark.Type()) - return - } - } else if innerFile.IsBatchEnd() { - if _, err = UnmarshalBatchEnd(file.Data); err != nil { - return - } - break LOOP - } else { - err = fmt.Errorf("unexpected entry type inside a block: %d", innerFile.EntryType) - return - } - } - - l2Block.L2Txs = txs - parsedEntry = l2Block - return - case EntryTypeL2Tx: - err = fmt.Errorf("unexpected l2Tx out of block") - default: - err = fmt.Errorf("unexpected entry type: %d", file.EntryType) - } - return -} From a979f1be0364379c6a3000551048d66b2acd0511 Mon Sep 17 00:00:00 2001 From: Jerry Date: Wed, 28 Aug 2024 15:28:07 -0700 Subject: [PATCH 5/7] Skip iteration when the last batch is incomplete --- zk/datastream/client/stream_client.go | 4 ++++ zk/datastream/server/data_stream_server.go | 10 ++++++++++ 2 files changed, 14 insertions(+) diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 98cc50109bf..e1a0fbd9cbc 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -367,6 +367,10 @@ func ReadParsedProto(iterator FileEntryIterator) ( return } + if file == nil { + return nil, nil + } + switch file.EntryType { case types.BookmarkEntryType: parsedEntry, err = types.UnmarshalBookmark(file.Data) diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index 6dbc1c83a98..0eea1bfc726 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -572,16 +572,22 @@ func (srv *DataStreamServer) getLastEntryOfType(entryType datastreamer.EntryType type dataStreamServerIterator struct { stream *datastreamer.StreamServer curEntryNum uint64 + header uint64 } func newDataStreamServerIterator(stream *datastreamer.StreamServer, start uint64) *dataStreamServerIterator { return &dataStreamServerIterator{ stream: stream, curEntryNum: start, + header: stream.GetHeader().TotalEntries - 1, } } func (it *dataStreamServerIterator) NextFileEntry() (entry *types.FileEntry, err error) { + if it.curEntryNum > it.header { + return nil, nil + } + var fileEntry datastreamer.FileEntry fileEntry, err = it.stream.GetEntry(it.curEntryNum) if err != nil { @@ -627,6 +633,10 @@ LOOP_ENTRIES: return nil, err } + if parsedProto == nil { + break + } + switch parsedProto := parsedProto.(type) { case *types.BatchStart: batches[parsedProto.Number-start] = []*types.FullL2Block{} From ed27eccb50291ba4e110f7d2cccd557e12af5dc5 Mon Sep 17 00:00:00 2001 From: Jerry Date: Fri, 13 Sep 2024 16:17:03 -0700 Subject: [PATCH 6/7] Wait for pending verifications during resequencing --- zk/legacy_executor_verifier/legacy_executor_verifier.go | 7 +++++++ zk/stages/stage_sequence_execute.go | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/zk/legacy_executor_verifier/legacy_executor_verifier.go b/zk/legacy_executor_verifier/legacy_executor_verifier.go index a69df6ee52a..6a69ddad2c5 100644 --- a/zk/legacy_executor_verifier/legacy_executor_verifier.go +++ b/zk/legacy_executor_verifier/legacy_executor_verifier.go @@ -315,6 +315,13 @@ func (v *LegacyExecutorVerifier) VerifyWithoutExecutor(request *VerifierRequest) return promise } +func (v *LegacyExecutorVerifier) HasPendingVerifications() bool { + v.mtxPromises.Lock() + defer v.mtxPromises.Unlock() + + return len(v.promises) > 0 +} + func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, error) { v.mtxPromises.Lock() defer v.mtxPromises.Unlock() diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 90a16083a06..4c87b9a2074 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -251,6 +251,11 @@ func sequencingStageStep( if batchState.isResequence() { if !batchState.resequenceBatchJob.HasMoreBlockToProcess() { + for streamWriter.legacyVerifier.HasPendingVerifications() { + streamWriter.CommitNewUpdates() + time.Sleep(1 * time.Second) + } + runLoopBlocks = false break } From bccd57fff11cc8c2bb8eee0a87e86e2ad49dfcb9 Mon Sep 17 00:00:00 2001 From: Jerry Date: Thu, 19 Sep 2024 08:40:30 -0700 Subject: [PATCH 7/7] Address PR comments --- cmd/integration/commands/stage_stages_zkevm.go | 2 -- zk/stages/stage_sequence_execute_utils.go | 4 ---- 2 files changed, 6 deletions(-) diff --git a/cmd/integration/commands/stage_stages_zkevm.go b/cmd/integration/commands/stage_stages_zkevm.go index 85b8f72cb47..607072bb231 100644 --- a/cmd/integration/commands/stage_stages_zkevm.go +++ b/cmd/integration/commands/stage_stages_zkevm.go @@ -95,8 +95,6 @@ func unwindZk(ctx context.Context, db kv.RwDB) error { return err } - stages.SaveStageProgress(tx, stages.HighestSeenBatchNumber, unwindBatchNo) - if err := tx.Commit(); err != nil { return err } diff --git a/zk/stages/stage_sequence_execute_utils.go b/zk/stages/stage_sequence_execute_utils.go index 70ada3a32b6..7f7e89d0a01 100644 --- a/zk/stages/stage_sequence_execute_utils.go +++ b/zk/stages/stage_sequence_execute_utils.go @@ -297,10 +297,6 @@ func prepareL1AndInfoTreeRelatedStuff( ger = l1TreeUpdate.GER } - if batchState.isResequence() && l1TreeUpdateIndex == 0 { - shouldWriteGerToContract = false - } - return }