diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index 96e43da4d4a..ecf79ff77b5 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -31,6 +31,7 @@ var ( pruneTBefore, pruneCBefore uint64 experiments []string chain string // Which chain to use (mainnet, rinkeby, goerli, etc.) + config string commitmentMode string commitmentTrie string @@ -49,7 +50,7 @@ func must(err error) { } func withConfig(cmd *cobra.Command) { - cmd.Flags().String("config", "", "yaml/toml config file location") + cmd.Flags().StringVar(&config, "config", "", "yaml/toml config file location") } func withMining(cmd *cobra.Command) { diff --git a/cmd/integration/commands/stage_stages_zkevm.go b/cmd/integration/commands/stage_stages_zkevm.go index d2078b02e44..607072bb231 100644 --- a/cmd/integration/commands/stage_stages_zkevm.go +++ b/cmd/integration/commands/stage_stages_zkevm.go @@ -8,11 +8,8 @@ import ( common2 "github.com/gateway-fm/cdk-erigon-lib/common" "github.com/gateway-fm/cdk-erigon-lib/kv" - "github.com/ledgerwatch/erigon/core" - "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" smtdb "github.com/ledgerwatch/erigon/smt/pkg/db" - erigoncli "github.com/ledgerwatch/erigon/turbo/cli" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" @@ -28,9 +25,6 @@ state_stages_zkevm --datadir=/datadirs/hermez-mainnet --unwind-batch-no=2 --chai Example: "go run ./cmd/integration state_stages_zkevm --config=... --verbosity=3 --unwind-batch-no=100", Run: func(cmd *cobra.Command, args []string) { ctx, _ := common2.RootContext() - ethConfig := ðconfig.Defaults - ethConfig.Genesis = core.GenesisBlockByChainName(chain) - erigoncli.ApplyFlagsForEthConfigCobra(cmd.Flags(), ethConfig) db := openDB(dbCfg(kv.ChainDB, chaindata), true) defer db.Close() diff --git a/cmd/integration/commands/stages_zkevm.go b/cmd/integration/commands/stages_zkevm.go index 79fbf578f2e..9322fc2e853 100644 --- a/cmd/integration/commands/stages_zkevm.go +++ b/cmd/integration/commands/stages_zkevm.go @@ -2,6 +2,12 @@ package commands import ( "context" + "encoding/json" + "math/big" + "os" + "path" + "path/filepath" + "strings" "github.com/c2h5oh/datasize" chain3 "github.com/gateway-fm/cdk-erigon-lib/chain" @@ -10,11 +16,14 @@ import ( "github.com/gateway-fm/cdk-erigon-lib/kv/kvcfg" "github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb" "github.com/ledgerwatch/erigon/cmd/sentry/sentry" + "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/shards" stages2 "github.com/ledgerwatch/erigon/turbo/stages" "github.com/ledgerwatch/erigon/zk/sequencer" @@ -26,7 +35,36 @@ func newSyncZk(ctx context.Context, db kv.RwDB) (consensus.Engine, *vm.Config, * vmConfig := &vm.Config{} - genesis := core.GenesisBlockByChainName(chain) + var genesis *types.Genesis + + if strings.HasPrefix(chain, "dynamic") { + if config == "" { + panic("Config file is required for dynamic chain") + } + + params.DynamicChainConfigPath = filepath.Dir(config) + genesis = core.GenesisBlockByChainName(chain) + filename := path.Join(params.DynamicChainConfigPath, chain+"-conf.json") + + dConf := utils.DynamicConfig{} + + if _, err := os.Stat(filename); err == nil { + dConfBytes, err := os.ReadFile(filename) + if err != nil { + panic(err) + } + if err := json.Unmarshal(dConfBytes, &dConf); err != nil { + panic(err) + } + } + + genesis.Timestamp = dConf.Timestamp + genesis.GasLimit = dConf.GasLimit + genesis.Difficulty = big.NewInt(dConf.Difficulty) + } else { + genesis = core.GenesisBlockByChainName(chain) + } + chainConfig, genesisBlock, genesisErr := core.CommitGenesisBlock(db, genesis, "") if _, ok := genesisErr.(*chain3.ConfigCompatError); genesisErr != nil && !ok { panic(genesisErr) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5a131834253..f774766230e 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -506,6 +506,21 @@ var ( Usage: "Halt the sequencer on this batch number", Value: 0, } + SequencerResequence = cli.BoolFlag{ + Name: "zkevm.sequencer-resequence", + Usage: "When enabled, the sequencer will automatically resequence unseen batches stored in data stream", + Value: false, + } + SequencerResequenceStrict = cli.BoolFlag{ + Name: "zkevm.sequencer-resequence-strict", + Usage: "Strictly resequence the rolledback batches", + Value: true, + } + SequencerResequenceReuseL1InfoIndex = cli.BoolFlag{ + Name: "zkevm.sequencer-resequence-reuse-l1-info-index", + Usage: "Reuse the L1 info index for resequencing", + Value: true, + } ExecutorUrls = cli.StringFlag{ Name: "zkevm.executor-urls", Usage: "A comma separated list of grpc addresses that host executors", diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index 113a541c339..32de04302b2 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -39,6 +39,9 @@ type Zk struct { SequencerBatchVerificationTimeout time.Duration SequencerTimeoutOnEmptyTxPool time.Duration SequencerHaltOnBatchNumber uint64 + SequencerResequence bool + SequencerResequenceStrict bool + SequencerResequenceReuseL1InfoIndex bool ExecutorUrls []string ExecutorStrictMode bool ExecutorRequestTimeout time.Duration diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 9b048e5d5d2..5d81f0414c2 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -199,6 +199,9 @@ var DefaultFlags = []cli.Flag{ &utils.SequencerBatchVerificationTimeout, &utils.SequencerTimeoutOnEmptyTxPool, &utils.SequencerHaltOnBatchNumber, + &utils.SequencerResequence, + &utils.SequencerResequenceStrict, + &utils.SequencerResequenceReuseL1InfoIndex, &utils.ExecutorUrls, &utils.ExecutorStrictMode, &utils.ExecutorRequestTimeout, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 8c442dfb856..73e571a8037 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -142,6 +142,9 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { SequencerBatchVerificationTimeout: sequencerBatchVerificationTimeout, SequencerTimeoutOnEmptyTxPool: sequencerTimeoutOnEmptyTxPool, SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name), + SequencerResequence: ctx.Bool(utils.SequencerResequence.Name), + SequencerResequenceStrict: ctx.Bool(utils.SequencerResequenceStrict.Name), + SequencerResequenceReuseL1InfoIndex: ctx.Bool(utils.SequencerResequenceReuseL1InfoIndex.Name), ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","), ExecutorStrictMode: ctx.Bool(utils.ExecutorStrictMode.Name), ExecutorRequestTimeout: ctx.Duration(utils.ExecutorRequestTimeout.Name), diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index f7362743284..e1a0fbd9cbc 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -184,7 +184,7 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu if c.Header.TotalEntries == count { break } - file, err := c.readFileEntry() + file, err := c.NextFileEntry() if err != nil { return fmt.Errorf("reading file entry: %v", err) } @@ -304,7 +304,7 @@ LOOP: c.conn.SetReadDeadline(time.Now().Add(c.checkTimeout)) } - parsedProto, localErr := c.readParsedProto() + parsedProto, localErr := ReadParsedProto(c) if localErr != nil { err = localErr break @@ -353,16 +353,24 @@ func (c *StreamClient) tryReConnect() error { return err } -func (c *StreamClient) readParsedProto() ( +type FileEntryIterator interface { + NextFileEntry() (*types.FileEntry, error) +} + +func ReadParsedProto(iterator FileEntryIterator) ( parsedEntry interface{}, err error, ) { - file, err := c.readFileEntry() + file, err := iterator.NextFileEntry() if err != nil { err = fmt.Errorf("read file entry error: %v", err) return } + if file == nil { + return nil, nil + } + switch file.EntryType { case types.BookmarkEntryType: parsedEntry, err = types.UnmarshalBookmark(file.Data) @@ -384,7 +392,7 @@ func (c *StreamClient) readParsedProto() ( var l2Tx *types.L2TransactionProto LOOP: for { - if innerFile, err = c.readFileEntry(); err != nil { + if innerFile, err = iterator.NextFileEntry(); err != nil { return } @@ -438,7 +446,7 @@ func (c *StreamClient) readParsedProto() ( // reads file bytes from socket and tries to parse them // returns the parsed FileEntry -func (c *StreamClient) readFileEntry() (file *types.FileEntry, err error) { +func (c *StreamClient) NextFileEntry() (file *types.FileEntry, err error) { // Read packet type packet, err := readBuffer(c.conn, 1) if err != nil { diff --git a/zk/datastream/client/stream_client_test.go b/zk/datastream/client/stream_client_test.go index 026879aa424..66a1338dbb1 100644 --- a/zk/datastream/client/stream_client_test.go +++ b/zk/datastream/client/stream_client_test.go @@ -185,7 +185,7 @@ func Test_readFileEntry(t *testing.T) { server.Close() }() - result, err := c.readFileEntry() + result, err := c.NextFileEntry() require.Equal(t, testCase.expectedError, err) assert.DeepEqual(t, testCase.expectedResult, result) }) diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index e8f06f9177d..5968c5e19cb 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -12,6 +12,7 @@ import ( "github.com/gateway-fm/cdk-erigon-lib/kv" "github.com/ledgerwatch/erigon/core/rawdb" eritypes "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/zk/datastream/client" "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" "github.com/ledgerwatch/erigon/zk/datastream/types" ) @@ -599,3 +600,88 @@ func (srv *DataStreamServer) getLastEntryOfType(entryType datastreamer.EntryType return emtryEntry, false, nil } + +type dataStreamServerIterator struct { + stream *datastreamer.StreamServer + curEntryNum uint64 + header uint64 +} + +func newDataStreamServerIterator(stream *datastreamer.StreamServer, start uint64) *dataStreamServerIterator { + return &dataStreamServerIterator{ + stream: stream, + curEntryNum: start, + header: stream.GetHeader().TotalEntries - 1, + } +} + +func (it *dataStreamServerIterator) NextFileEntry() (entry *types.FileEntry, err error) { + if it.curEntryNum > it.header { + return nil, nil + } + + var fileEntry datastreamer.FileEntry + fileEntry, err = it.stream.GetEntry(it.curEntryNum) + if err != nil { + return nil, err + } + + it.curEntryNum += 1 + + return &types.FileEntry{ + PacketType: uint8(fileEntry.Type), + Length: fileEntry.Length, + EntryType: types.EntryType(fileEntry.Type), + EntryNum: fileEntry.Number, + Data: fileEntry.Data, + }, nil +} + +func (srv *DataStreamServer) ReadBatches(start uint64, end uint64) ([][]*types.FullL2Block, error) { + bookmark := types.NewBookmarkProto(start, datastream.BookmarkType_BOOKMARK_TYPE_BATCH) + marshalled, err := bookmark.Marshal() + if err != nil { + return nil, err + } + + entryNum, err := srv.stream.GetBookmark(marshalled) + + if err != nil { + return nil, err + } + + iterator := newDataStreamServerIterator(srv.stream, entryNum) + + return ReadBatches(iterator, start, end) +} + +func ReadBatches(iterator client.FileEntryIterator, start uint64, end uint64) ([][]*types.FullL2Block, error) { + batches := make([][]*types.FullL2Block, end-start+1) + +LOOP_ENTRIES: + for { + parsedProto, err := client.ReadParsedProto(iterator) + if err != nil { + return nil, err + } + + if parsedProto == nil { + break + } + + switch parsedProto := parsedProto.(type) { + case *types.BatchStart: + batches[parsedProto.Number-start] = []*types.FullL2Block{} + case *types.BatchEnd: + if parsedProto.Number == end { + break LOOP_ENTRIES + } + case *types.FullL2Block: + batches[parsedProto.BatchNumber-start] = append(batches[parsedProto.BatchNumber-start], parsedProto) + default: + continue + } + } + + return batches, nil +} diff --git a/zk/legacy_executor_verifier/legacy_executor_verifier.go b/zk/legacy_executor_verifier/legacy_executor_verifier.go index 404b8314641..9b081b0c01e 100644 --- a/zk/legacy_executor_verifier/legacy_executor_verifier.go +++ b/zk/legacy_executor_verifier/legacy_executor_verifier.go @@ -315,6 +315,13 @@ func (v *LegacyExecutorVerifier) VerifyWithoutExecutor(request *VerifierRequest) return promise } +func (v *LegacyExecutorVerifier) HasPendingVerifications() bool { + v.mtxPromises.Lock() + defer v.mtxPromises.Unlock() + + return len(v.promises) > 0 +} + func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, error) { v.mtxPromises.Lock() defer v.mtxPromises.Unlock() diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 3a3acc42116..473757af7e9 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -15,6 +15,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk" + zktx "github.com/ledgerwatch/erigon/zk/tx" "github.com/ledgerwatch/erigon/zk/utils" ) @@ -28,6 +29,78 @@ func SpawnSequencingStage( cfg SequenceBlockCfg, historyCfg stagedsync.HistoryCfg, quiet bool, +) (err error) { + roTx, err := cfg.db.BeginRo(ctx) + if err != nil { + return err + } + defer roTx.Rollback() + + lastBatch, err := stages.GetStageProgress(roTx, stages.HighestSeenBatchNumber) + if err != nil { + return err + } + + highestBatchInDS, err := cfg.datastreamServer.GetHighestBatchNumber() + if err != nil { + return err + } + + if !cfg.zk.SequencerResequence || lastBatch >= highestBatchInDS { + if cfg.zk.SequencerResequence { + log.Info(fmt.Sprintf("[%s] Resequencing completed. Please restart sequencer without resequence flag.", s.LogPrefix())) + time.Sleep(10 * time.Second) + return nil + } + + err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, nil) + if err != nil { + return err + } + } else { + log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDS)) + + batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDS) + if err != nil { + return err + } + + err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1) + if err != nil { + return err + } + + log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDS)) + + for _, batch := range batches { + batchJob := NewResequenceBatchJob(batch) + subBatchCount := 0 + for batchJob.HasMoreBlockToProcess() { + if err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, batchJob); err != nil { + return err + } + + subBatchCount += 1 + } + + log.Info(fmt.Sprintf("[%s] Resequenced original batch %d with %d batches", s.LogPrefix(), batchJob.batchToProcess[0].BatchNumber, subBatchCount)) + if cfg.zk.SequencerResequenceStrict && subBatchCount != 1 { + return fmt.Errorf("strict mode enabled, but resequenced batch %d has %d sub-batches", batchJob.batchToProcess[0].BatchNumber, subBatchCount) + } + } + } + + return nil +} + +func sequencingStageStep( + s *stagedsync.StageState, + u stagedsync.Unwinder, + ctx context.Context, + cfg SequenceBlockCfg, + historyCfg stagedsync.HistoryCfg, + quiet bool, + resequenceBatchJob *ResequenceBatchJob, ) (err error) { logPrefix := s.LogPrefix() log.Info(fmt.Sprintf("[%s] Starting sequencing stage", logPrefix)) @@ -69,7 +142,7 @@ func SpawnSequencingStage( var block *types.Block runLoopBlocks := true batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb) - batchState := newBatchState(forkId, batchNumberForStateInitialization, executionAt+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) + batchState := newBatchState(forkId, batchNumberForStateInitialization, executionAt+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool, resequenceBatchJob) blockDataSizeChecker := newBlockDataChecker() streamWriter := newSequencerBatchStreamWriter(batchContext, batchState) @@ -176,6 +249,18 @@ func SpawnSequencingStage( } } + if batchState.isResequence() { + if !batchState.resequenceBatchJob.HasMoreBlockToProcess() { + for streamWriter.legacyVerifier.HasPendingVerifications() { + streamWriter.CommitNewUpdates() + time.Sleep(1 * time.Second) + } + + runLoopBlocks = false + break + } + } + header, parentBlock, err := prepareHeader(sdb.tx, blockNumber-1, batchState.blockState.getDeltaTimestamp(), batchState.getBlockHeaderForcedTimestamp(), batchState.forkId, batchState.getCoinbase(&cfg)) if err != nil { return err @@ -189,7 +274,7 @@ func SpawnSequencingStage( // timer: evm + smt t := utils.StartTimer("stage_sequence_execute", "evm", "smt") - infoTreeIndexProgress, l1TreeUpdate, l1TreeUpdateIndex, l1BlockHash, ger, shouldWriteGerToContract, err := prepareL1AndInfoTreeRelatedStuff(sdb, batchState, header.Time) + infoTreeIndexProgress, l1TreeUpdate, l1TreeUpdateIndex, l1BlockHash, ger, shouldWriteGerToContract, err := prepareL1AndInfoTreeRelatedStuff(sdb, batchState, header.Time, cfg.zk.SequencerResequenceReuseL1InfoIndex) if err != nil { return err } @@ -198,7 +283,7 @@ func SpawnSequencingStage( if err != nil { return err } - if !batchState.isAnyRecovery() && overflowOnNewBlock { + if (!batchState.isAnyRecovery() || batchState.isResequence()) && overflowOnNewBlock { break } @@ -240,7 +325,7 @@ func SpawnSequencingStage( if err != nil { return err } - } else if !batchState.isL1Recovery() { + } else if !batchState.isL1Recovery() && !batchState.isResequence() { var allConditionsOK bool batchState.blockState.transactionsForInclusion, allConditionsOK, err = getNextPoolTransactions(ctx, cfg, executionAt, batchState.forkId, batchState.yieldedTransactions) if err != nil { @@ -256,6 +341,17 @@ func SpawnSequencingStage( } else { log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion)) } + } else if batchState.isResequence() { + batchState.blockState.transactionsForInclusion, err = batchState.resequenceBatchJob.YieldNextBlockTransactions(zktx.DecodeTx) + if err != nil { + return err + } + } + + if len(batchState.blockState.transactionsForInclusion) == 0 { + time.Sleep(batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool) + } else { + log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion)) } for i, transaction := range batchState.blockState.transactionsForInclusion { @@ -270,6 +366,18 @@ func SpawnSequencingStage( panic("limbo transaction has already been executed once so they must not fail while re-executing") } + if batchState.isResequence() { + if cfg.zk.SequencerResequenceStrict { + return fmt.Errorf("strict mode enabled, but resequenced batch %d failed to add transaction %s: %v", batchState.batchNumber, txHash, err) + } else { + log.Warn(fmt.Sprintf("[%s] error adding transaction to batch during resequence: %v", logPrefix, err), + "hash", txHash, + "to", transaction.GetTo(), + ) + continue + } + } + // if we are in recovery just log the error as a warning. If the data is on the L1 then we should consider it as confirmed. // The executor/prover would simply skip a TX with an invalid nonce for example so we don't need to worry about that here. if batchState.isL1Recovery() { @@ -311,12 +419,39 @@ func SpawnSequencingStage( break LOOP_TRANSACTIONS } + if batchState.isResequence() && cfg.zk.SequencerResequenceStrict { + return fmt.Errorf("strict mode enabled, but resequenced batch %d overflowed counters on block %d", batchState.batchNumber, blockNumber) + } + + break LOOP_TRANSACTIONS } if err == nil { blockDataSizeChecker = &backupDataSizeChecker batchState.onAddedTransaction(transaction, receipt, execResult, effectiveGas) } + + // We will only update the processed index in resequence job if there isn't overflow + if batchState.isResequence() { + batchState.resequenceBatchJob.UpdateLastProcessedTx(txHash) + } + } + + if batchState.isResequence() { + if len(batchState.blockState.transactionsForInclusion) == 0 { + // We need to jump to the next block here if there are no transactions in current block + batchState.resequenceBatchJob.UpdateLastProcessedTx(batchState.resequenceBatchJob.CurrentBlock().L2Blockhash) + break LOOP_TRANSACTIONS + } + + if batchState.resequenceBatchJob.AtNewBlockBoundary() { + // We need to jump to the next block here if we are at the end of the current block + break LOOP_TRANSACTIONS + } else { + if cfg.zk.SequencerResequenceStrict { + return fmt.Errorf("strict mode enabled, but resequenced batch %d has transactions that overflowed counters or failed transactions", batchState.batchNumber) + } + } } if batchState.isL1Recovery() { diff --git a/zk/stages/stage_sequence_execute_state.go b/zk/stages/stage_sequence_execute_state.go index 30da82ad928..19db7dd212e 100644 --- a/zk/stages/stage_sequence_execute_state.go +++ b/zk/stages/stage_sequence_execute_state.go @@ -44,9 +44,10 @@ type BatchState struct { blockState *BlockState batchL1RecoveryData *BatchL1RecoveryData limboRecoveryData *LimboRecoveryData + resequenceBatchJob *ResequenceBatchJob } -func newBatchState(forkId, batchNumber, blockNumber uint64, hasExecutorForThisBatch, l1Recovery bool, txPool *txpool.TxPool) *BatchState { +func newBatchState(forkId, batchNumber, blockNumber uint64, hasExecutorForThisBatch, l1Recovery bool, txPool *txpool.TxPool, resequenceBatchJob *ResequenceBatchJob) *BatchState { batchState := &BatchState{ forkId: forkId, batchNumber: batchNumber, @@ -57,6 +58,7 @@ func newBatchState(forkId, batchNumber, blockNumber uint64, hasExecutorForThisBa blockState: newBlockState(), batchL1RecoveryData: nil, limboRecoveryData: nil, + resequenceBatchJob: resequenceBatchJob, } if batchNumber != injectedBatchBatchNumber { // process injected batch regularly, no matter if it is in any recovery @@ -95,8 +97,12 @@ func (bs *BatchState) isLimboRecovery() bool { return bs.limboRecoveryData != nil } +func (bs *BatchState) isResequence() bool { + return bs.resequenceBatchJob != nil +} + func (bs *BatchState) isAnyRecovery() bool { - return bs.isL1Recovery() || bs.isLimboRecovery() + return bs.isL1Recovery() || bs.isLimboRecovery() || bs.isResequence() } func (bs *BatchState) isThereAnyTransactionsToRecover() bool { @@ -119,6 +125,10 @@ func (bs *BatchState) getBlockHeaderForcedTimestamp() uint64 { return bs.limboRecoveryData.limboHeaderTimestamp } + if bs.isResequence() { + return uint64(bs.resequenceBatchJob.CurrentBlock().Timestamp) + } + return math.MaxUint64 } diff --git a/zk/stages/stage_sequence_execute_utils.go b/zk/stages/stage_sequence_execute_utils.go index 1ca4ad8be43..7f7e89d0a01 100644 --- a/zk/stages/stage_sequence_execute_utils.go +++ b/zk/stages/stage_sequence_execute_utils.go @@ -29,6 +29,7 @@ import ( "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/ledgerwatch/erigon/zk/datastream/server" + dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types" "github.com/ledgerwatch/erigon/zk/hermez_db" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/erigon/zk/tx" @@ -240,7 +241,12 @@ func prepareHeader(tx kv.RwTx, previousBlockNumber, deltaTimestamp, forcedTimest }, parentBlock, nil } -func prepareL1AndInfoTreeRelatedStuff(sdb *stageDb, batchState *BatchState, proposedTimestamp uint64) ( +func prepareL1AndInfoTreeRelatedStuff( + sdb *stageDb, + batchState *BatchState, + proposedTimestamp uint64, + reuseL1InfoIndex bool, +) ( infoTreeIndexProgress uint64, l1TreeUpdate *zktypes.L1InfoTreeUpdate, l1TreeUpdateIndex uint64, @@ -258,8 +264,17 @@ func prepareL1AndInfoTreeRelatedStuff(sdb *stageDb, batchState *BatchState, prop return } - if batchState.isL1Recovery() { - l1TreeUpdateIndex = uint64(batchState.blockState.blockL1RecoveryData.L1InfoTreeIndex) + if batchState.isL1Recovery() || (batchState.isResequence() && reuseL1InfoIndex) { + if batchState.isL1Recovery() { + l1TreeUpdateIndex = uint64(batchState.blockState.blockL1RecoveryData.L1InfoTreeIndex) + } else { + // Resequence mode: + // If we are resequencing at the beginning (AtNewBlockBoundary->true) of a rolledback block, we need to reuse the l1TreeUpdateIndex from the block. + // If we are in the middle of a block (AtNewBlockBoundary -> false), it means the original block will be requenced into multiple blocks, so we will leave l1TreeUpdateIndex as 0 for the rest of blocks. + if batchState.resequenceBatchJob.AtNewBlockBoundary() { + l1TreeUpdateIndex = uint64(batchState.resequenceBatchJob.CurrentBlock().L1InfoTreeIndex) + } + } if l1TreeUpdate, err = sdb.hermezDb.GetL1InfoTreeUpdate(l1TreeUpdateIndex); err != nil { return } @@ -489,3 +504,78 @@ func (bdc *BlockDataChecker) AddTransactionData(txL2Data []byte) bool { return false } + +type txMatadata struct { + blockNum int + txIndex int +} + +type ResequenceBatchJob struct { + batchToProcess []*dsTypes.FullL2Block + StartBlockIndex int + StartTxIndex int + txIndexMap map[common.Hash]txMatadata +} + +func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob { + return &ResequenceBatchJob{ + batchToProcess: batch, + StartBlockIndex: 0, + StartTxIndex: 0, + txIndexMap: make(map[common.Hash]txMatadata), + } +} + +func (r *ResequenceBatchJob) HasMoreBlockToProcess() bool { + return r.StartBlockIndex < len(r.batchToProcess) +} + +func (r *ResequenceBatchJob) AtNewBlockBoundary() bool { + return r.StartTxIndex == 0 +} + +func (r *ResequenceBatchJob) CurrentBlock() *dsTypes.FullL2Block { + if r.HasMoreBlockToProcess() { + return r.batchToProcess[r.StartBlockIndex] + } + return nil +} + +func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) ([]types.Transaction, error) { + blockTransactions := make([]types.Transaction, 0) + if r.HasMoreBlockToProcess() { + block := r.CurrentBlock() + r.txIndexMap[block.L2Blockhash] = txMatadata{r.StartBlockIndex, 0} + + for i := r.StartTxIndex; i < len(block.L2Txs); i++ { + transaction := block.L2Txs[i] + tx, _, err := decoder(transaction.Encoded, transaction.EffectiveGasPricePercentage, block.ForkId) + if err != nil { + return nil, fmt.Errorf("decode tx error: %v", err) + } + r.txIndexMap[tx.Hash()] = txMatadata{r.StartBlockIndex, i} + blockTransactions = append(blockTransactions, tx) + } + } + + return blockTransactions, nil +} + +func (r *ResequenceBatchJob) UpdateLastProcessedTx(h common.Hash) { + if idx, ok := r.txIndexMap[h]; ok { + block := r.batchToProcess[idx.blockNum] + + if idx.txIndex >= len(block.L2Txs)-1 { + // we've processed all the transactions in this block + // move to the next block + r.StartBlockIndex = idx.blockNum + 1 + r.StartTxIndex = 0 + } else { + // move to the next transaction in the block + r.StartBlockIndex = idx.blockNum + r.StartTxIndex = idx.txIndex + 1 + } + } else { + log.Warn("tx hash not found in tx index map", "hash", h) + } +} diff --git a/zk/stages/stage_sequence_execute_utils_test.go b/zk/stages/stage_sequence_execute_utils_test.go index 3ff72032840..f5fe9d0eb50 100644 --- a/zk/stages/stage_sequence_execute_utils_test.go +++ b/zk/stages/stage_sequence_execute_utils_test.go @@ -1,8 +1,13 @@ package stages import ( + "reflect" "testing" + "github.com/gateway-fm/cdk-erigon-lib/common" + "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon/core/types" + dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types" zktx "github.com/ledgerwatch/erigon/zk/tx" zktypes "github.com/ledgerwatch/erigon/zk/types" ) @@ -207,3 +212,252 @@ func Test_PrepareForkId_DuringRecovery(t *testing.T) { }) } } + +// Mock implementation of zktx.DecodeTx for testing purposes +func mockDecodeTx(encoded []byte, effectiveGasPricePercentage byte, forkId uint64) (types.Transaction, uint8, error) { + return types.NewTransaction(0, common.Address{}, uint256.NewInt(0), 0, uint256.NewInt(0), encoded), 0, nil +} + +func TestResequenceBatchJob_HasMoreToProcess(t *testing.T) { + tests := []struct { + name string + job ResequenceBatchJob + expected bool + }{ + { + name: "Has more blocks to process", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{}, {}}, + StartBlockIndex: 1, + StartTxIndex: 0, + }, + expected: true, + }, + { + name: "Has more transactions to process", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{L2Txs: []dsTypes.L2TransactionProto{{}, {}}}}, + StartBlockIndex: 0, + StartTxIndex: 0, + }, + expected: true, + }, + { + name: "No more to process", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{}}, + StartBlockIndex: 1, + StartTxIndex: 0, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.job.HasMoreBlockToProcess(); got != tt.expected { + t.Errorf("ResequenceBatchJob.HasMoreBlockToProcess() = %v, want %v", got, tt.expected) + } + }) + } +} + +func TestResequenceBatchJob_CurrentBlock(t *testing.T) { + tests := []struct { + name string + job ResequenceBatchJob + expected *dsTypes.FullL2Block + }{ + { + name: "Has current block", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{L2BlockNumber: 1}, {L2BlockNumber: 2}}, + StartBlockIndex: 0, + StartTxIndex: 0, + }, + expected: &dsTypes.FullL2Block{L2BlockNumber: 1}, + }, + { + name: "No current block", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{L2BlockNumber: 1}}, + StartBlockIndex: 1, + StartTxIndex: 0, + }, + expected: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.job.CurrentBlock() + if (got == nil && tt.expected != nil) || (got != nil && tt.expected == nil) { + t.Errorf("ResequenceBatchJob.CurrentBlock() = %v, want %v", got, tt.expected) + } + if got != nil && tt.expected != nil && got.L2BlockNumber != tt.expected.L2BlockNumber { + t.Errorf("ResequenceBatchJob.CurrentBlock().L2BlockNumber = %v, want %v", got.L2BlockNumber, tt.expected.L2BlockNumber) + } + }) + } +} + +func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) { + // Replace the actual zktx.DecodeTx with our mock function for testing + + tests := []struct { + name string + job ResequenceBatchJob + expectedTxCount int + expectedError bool + }{ + { + name: "Yield transactions", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{ + { + L2Txs: []dsTypes.L2TransactionProto{{}, {}}, + ForkId: 1, + }, + }, + StartBlockIndex: 0, + StartTxIndex: 0, + txIndexMap: make(map[common.Hash]txMatadata), + }, + expectedTxCount: 2, + expectedError: false, + }, + { + name: "No transactions to yield", + job: ResequenceBatchJob{ + batchToProcess: []*dsTypes.FullL2Block{{}}, + StartBlockIndex: 1, + StartTxIndex: 0, + txIndexMap: make(map[common.Hash]txMatadata), + }, + expectedTxCount: 0, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + txs, err := tt.job.YieldNextBlockTransactions(mockDecodeTx) + if (err != nil) != tt.expectedError { + t.Errorf("ResequenceBatchJob.YieldNextBlockTransactions() error = %v, expectedError %v", err, tt.expectedError) + return + } + if len(txs) != tt.expectedTxCount { + t.Errorf("ResequenceBatchJob.YieldNextBlockTransactions() returned %d transactions, expected %d", len(txs), tt.expectedTxCount) + } + }) + } +} + +func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) { + // Setup the batch + batch := []*dsTypes.FullL2Block{ + {L2Txs: []dsTypes.L2TransactionProto{{Encoded: []byte("1")}, {Encoded: []byte("2")}}, L2Blockhash: common.HexToHash("0")}, + {L2Txs: []dsTypes.L2TransactionProto{}, L2Blockhash: common.HexToHash("1")}, + {L2Txs: []dsTypes.L2TransactionProto{}, L2Blockhash: common.HexToHash("2")}, + {L2Txs: []dsTypes.L2TransactionProto{{Encoded: []byte("3")}, {Encoded: []byte("4")}}, L2Blockhash: common.HexToHash("3")}, + } + + job := ResequenceBatchJob{ + batchToProcess: batch, + StartBlockIndex: 0, + StartTxIndex: 1, // Start at block 0, index 1 + txIndexMap: make(map[common.Hash]txMatadata), + } + + processTransactions := func(txs []types.Transaction) { + for _, tx := range txs { + job.UpdateLastProcessedTx(tx.Hash()) + } + } + + // First call - should yield transaction 2 from block 0 + txs, err := job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("First call: Unexpected error: %v", err) + } + if len(txs) != 1 || string(txs[0].GetData()) != "2" { + t.Errorf("Expected 1 transaction with data '2', got %d transactions with data '%s'", len(txs), string(txs[0].GetData())) + } + processTransactions(txs) + tx2 := txs[0] + + // Second call - should yield empty block (block 1) + txs, err = job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("Second call: Unexpected error: %v", err) + } + if len(txs) != 0 { + t.Errorf("Expected 0 transactions, got %d", len(txs)) + } + job.UpdateLastProcessedTx(job.CurrentBlock().L2Blockhash) + + // Third call - should yield empty block (block 2) + txs, err = job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("Third call: Unexpected error: %v", err) + } + if len(txs) != 0 { + t.Errorf("Expected 0 transactions, got %d", len(txs)) + } + job.UpdateLastProcessedTx(job.CurrentBlock().L2Blockhash) + + // Fourth call - should yield transactions 3 and 4, but we'll only process 3 + txs, err = job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("Fourth call: Unexpected error: %v", err) + } + if len(txs) != 2 || string(txs[0].GetData()) != "3" || string(txs[1].GetData()) != "4" { + t.Errorf("Expected 2 transactions with data '3' and '4', got %d transactions", len(txs)) + } + processTransactions(txs[:1]) // Only process the first transaction (3) + tx3 := txs[0] + tx4 := txs[1] + + // Check final state + if job.StartBlockIndex != 3 { + t.Errorf("Expected StartBlockIndex to be 3, got %d", job.StartBlockIndex) + } + + if job.StartTxIndex != 1 { + t.Errorf("Expected StartTxIndex to be 1, got %d", job.StartTxIndex) + } + + // Final call - should yield transaction 4 + txs, err = job.YieldNextBlockTransactions(mockDecodeTx) + if err != nil { + t.Fatalf("Final call: Unexpected error: %v", err) + } + if len(txs) != 1 || string(txs[0].GetData()) != "4" { + t.Errorf("Expected 1 transaction with data '4', got %d transactions", len(txs)) + } + + processTransactions(txs) + + if job.HasMoreBlockToProcess() { + t.Errorf("Expected no more blocks to process") + } + + // Verify txIndexMap + expectedTxIndexMap := map[common.Hash]txMatadata{ + common.HexToHash("0"): {0, 0}, + common.HexToHash("1"): {1, 0}, + common.HexToHash("2"): {2, 0}, + common.HexToHash("3"): {3, 0}, + tx2.Hash(): {0, 1}, // Transaction 2 + tx3.Hash(): {3, 0}, // Transaction 3 + tx4.Hash(): {3, 1}, // Transaction 4 + } + + for hash, index := range expectedTxIndexMap { + if actualIndex, exists := job.txIndexMap[hash]; !exists { + t.Errorf("Expected hash %s to exist in txIndexMap", hash.Hex()) + } else if !reflect.DeepEqual(actualIndex, index) { + t.Errorf("For hash %s, expected index %v, got %v", hash.Hex(), index, actualIndex) + } + } +} diff --git a/zk/stages/test_utils.go b/zk/stages/test_utils.go index 62b130a9fa0..25a1798ee68 100644 --- a/zk/stages/test_utils.go +++ b/zk/stages/test_utils.go @@ -61,3 +61,7 @@ func (c *TestDatastreamClient) GetStreamingAtomic() *atomic.Bool { func (c *TestDatastreamClient) GetProgressAtomic() *atomic.Uint64 { return &c.progress } + +func (c *TestDatastreamClient) ReadBatches(start uint64, end uint64) ([][]*types.FullL2Block, error) { + return nil, nil +} diff --git a/zk/tx/tx.go b/zk/tx/tx.go index 93b0edc844e..e6f9debfc17 100644 --- a/zk/tx/tx.go +++ b/zk/tx/tx.go @@ -189,6 +189,8 @@ func DecodeBatchL2Blocks(txsData []byte, forkID uint64) ([]DecodedBatchL2Data, e return result, nil } +type TxDecoder func(encodedTx []byte, gasPricePercentage uint8, forkID uint64) (types.Transaction, uint8, error) + func DecodeTx(encodedTx []byte, efficiencyPercentage byte, forkId uint64) (types.Transaction, uint8, error) { // efficiencyPercentage := uint8(0) if forkId >= uint64(constants.ForkID5Dragonfruit) {