Skip to content

Commit

Permalink
Enable a sequencer to re-sequence batches (#894)
Browse files Browse the repository at this point in the history
* Enable a sequencer to re-sequence batches

* Stop producing blocks after resequencing completes

* Introduce flag reuseL1InfoIndex

* Move ReadParsedProto back to data stream client in order to reduce merge conflict

* Skip iteration when the last batch is incomplete

* Wait for pending verifications during resequencing

* Address PR comments
  • Loading branch information
cffls authored Sep 20, 2024
1 parent 5e56ded commit 292a2ec
Show file tree
Hide file tree
Showing 17 changed files with 677 additions and 24 deletions.
3 changes: 2 additions & 1 deletion cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
pruneTBefore, pruneCBefore uint64
experiments []string
chain string // Which chain to use (mainnet, rinkeby, goerli, etc.)
config string

commitmentMode string
commitmentTrie string
Expand All @@ -49,7 +50,7 @@ func must(err error) {
}

func withConfig(cmd *cobra.Command) {
cmd.Flags().String("config", "", "yaml/toml config file location")
cmd.Flags().StringVar(&config, "config", "", "yaml/toml config file location")
}

func withMining(cmd *cobra.Command) {
Expand Down
6 changes: 0 additions & 6 deletions cmd/integration/commands/stage_stages_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import (

common2 "github.com/gateway-fm/cdk-erigon-lib/common"
"github.com/gateway-fm/cdk-erigon-lib/kv"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
smtdb "github.com/ledgerwatch/erigon/smt/pkg/db"
erigoncli "github.com/ledgerwatch/erigon/turbo/cli"
"github.com/ledgerwatch/erigon/zk/hermez_db"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
Expand All @@ -28,9 +25,6 @@ state_stages_zkevm --datadir=/datadirs/hermez-mainnet --unwind-batch-no=2 --chai
Example: "go run ./cmd/integration state_stages_zkevm --config=... --verbosity=3 --unwind-batch-no=100",
Run: func(cmd *cobra.Command, args []string) {
ctx, _ := common2.RootContext()
ethConfig := &ethconfig.Defaults
ethConfig.Genesis = core.GenesisBlockByChainName(chain)
erigoncli.ApplyFlagsForEthConfigCobra(cmd.Flags(), ethConfig)
db := openDB(dbCfg(kv.ChainDB, chaindata), true)
defer db.Close()

Expand Down
40 changes: 39 additions & 1 deletion cmd/integration/commands/stages_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package commands

import (
"context"
"encoding/json"
"math/big"
"os"
"path"
"path/filepath"
"strings"

"github.com/c2h5oh/datasize"
chain3 "github.com/gateway-fm/cdk-erigon-lib/chain"
Expand All @@ -10,11 +16,14 @@ import (
"github.com/gateway-fm/cdk-erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/shards"
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
"github.com/ledgerwatch/erigon/zk/sequencer"
Expand All @@ -26,7 +35,36 @@ func newSyncZk(ctx context.Context, db kv.RwDB) (consensus.Engine, *vm.Config, *

vmConfig := &vm.Config{}

genesis := core.GenesisBlockByChainName(chain)
var genesis *types.Genesis

if strings.HasPrefix(chain, "dynamic") {
if config == "" {
panic("Config file is required for dynamic chain")
}

params.DynamicChainConfigPath = filepath.Dir(config)
genesis = core.GenesisBlockByChainName(chain)
filename := path.Join(params.DynamicChainConfigPath, chain+"-conf.json")

dConf := utils.DynamicConfig{}

if _, err := os.Stat(filename); err == nil {
dConfBytes, err := os.ReadFile(filename)
if err != nil {
panic(err)
}
if err := json.Unmarshal(dConfBytes, &dConf); err != nil {
panic(err)
}
}

genesis.Timestamp = dConf.Timestamp
genesis.GasLimit = dConf.GasLimit
genesis.Difficulty = big.NewInt(dConf.Difficulty)
} else {
genesis = core.GenesisBlockByChainName(chain)
}

chainConfig, genesisBlock, genesisErr := core.CommitGenesisBlock(db, genesis, "")
if _, ok := genesisErr.(*chain3.ConfigCompatError); genesisErr != nil && !ok {
panic(genesisErr)
Expand Down
15 changes: 15 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,21 @@ var (
Usage: "Halt the sequencer on this batch number",
Value: 0,
}
SequencerResequence = cli.BoolFlag{
Name: "zkevm.sequencer-resequence",
Usage: "When enabled, the sequencer will automatically resequence unseen batches stored in data stream",
Value: false,
}
SequencerResequenceStrict = cli.BoolFlag{
Name: "zkevm.sequencer-resequence-strict",
Usage: "Strictly resequence the rolledback batches",
Value: true,
}
SequencerResequenceReuseL1InfoIndex = cli.BoolFlag{
Name: "zkevm.sequencer-resequence-reuse-l1-info-index",
Usage: "Reuse the L1 info index for resequencing",
Value: true,
}
ExecutorUrls = cli.StringFlag{
Name: "zkevm.executor-urls",
Usage: "A comma separated list of grpc addresses that host executors",
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Zk struct {
SequencerBatchVerificationTimeout time.Duration
SequencerTimeoutOnEmptyTxPool time.Duration
SequencerHaltOnBatchNumber uint64
SequencerResequence bool
SequencerResequenceStrict bool
SequencerResequenceReuseL1InfoIndex bool
ExecutorUrls []string
ExecutorStrictMode bool
ExecutorRequestTimeout time.Duration
Expand Down
3 changes: 3 additions & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ var DefaultFlags = []cli.Flag{
&utils.SequencerBatchVerificationTimeout,
&utils.SequencerTimeoutOnEmptyTxPool,
&utils.SequencerHaltOnBatchNumber,
&utils.SequencerResequence,
&utils.SequencerResequenceStrict,
&utils.SequencerResequenceReuseL1InfoIndex,
&utils.ExecutorUrls,
&utils.ExecutorStrictMode,
&utils.ExecutorRequestTimeout,
Expand Down
3 changes: 3 additions & 0 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
SequencerBatchVerificationTimeout: sequencerBatchVerificationTimeout,
SequencerTimeoutOnEmptyTxPool: sequencerTimeoutOnEmptyTxPool,
SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name),
SequencerResequence: ctx.Bool(utils.SequencerResequence.Name),
SequencerResequenceStrict: ctx.Bool(utils.SequencerResequenceStrict.Name),
SequencerResequenceReuseL1InfoIndex: ctx.Bool(utils.SequencerResequenceReuseL1InfoIndex.Name),
ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","),
ExecutorStrictMode: ctx.Bool(utils.ExecutorStrictMode.Name),
ExecutorRequestTimeout: ctx.Duration(utils.ExecutorRequestTimeout.Name),
Expand Down
20 changes: 14 additions & 6 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu
if c.Header.TotalEntries == count {
break
}
file, err := c.readFileEntry()
file, err := c.NextFileEntry()
if err != nil {
return fmt.Errorf("reading file entry: %v", err)
}
Expand Down Expand Up @@ -304,7 +304,7 @@ LOOP:
c.conn.SetReadDeadline(time.Now().Add(c.checkTimeout))
}

parsedProto, localErr := c.readParsedProto()
parsedProto, localErr := ReadParsedProto(c)
if localErr != nil {
err = localErr
break
Expand Down Expand Up @@ -353,16 +353,24 @@ func (c *StreamClient) tryReConnect() error {
return err
}

func (c *StreamClient) readParsedProto() (
type FileEntryIterator interface {
NextFileEntry() (*types.FileEntry, error)
}

func ReadParsedProto(iterator FileEntryIterator) (
parsedEntry interface{},
err error,
) {
file, err := c.readFileEntry()
file, err := iterator.NextFileEntry()
if err != nil {
err = fmt.Errorf("read file entry error: %v", err)
return
}

if file == nil {
return nil, nil
}

switch file.EntryType {
case types.BookmarkEntryType:
parsedEntry, err = types.UnmarshalBookmark(file.Data)
Expand All @@ -384,7 +392,7 @@ func (c *StreamClient) readParsedProto() (
var l2Tx *types.L2TransactionProto
LOOP:
for {
if innerFile, err = c.readFileEntry(); err != nil {
if innerFile, err = iterator.NextFileEntry(); err != nil {
return
}

Expand Down Expand Up @@ -438,7 +446,7 @@ func (c *StreamClient) readParsedProto() (

// reads file bytes from socket and tries to parse them
// returns the parsed FileEntry
func (c *StreamClient) readFileEntry() (file *types.FileEntry, err error) {
func (c *StreamClient) NextFileEntry() (file *types.FileEntry, err error) {
// Read packet type
packet, err := readBuffer(c.conn, 1)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func Test_readFileEntry(t *testing.T) {
server.Close()
}()

result, err := c.readFileEntry()
result, err := c.NextFileEntry()
require.Equal(t, testCase.expectedError, err)
assert.DeepEqual(t, testCase.expectedResult, result)
})
Expand Down
86 changes: 86 additions & 0 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/gateway-fm/cdk-erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
eritypes "github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/zk/datastream/client"
"github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ledgerwatch/erigon/zk/datastream/types"
)
Expand Down Expand Up @@ -599,3 +600,88 @@ func (srv *DataStreamServer) getLastEntryOfType(entryType datastreamer.EntryType

return emtryEntry, false, nil
}

type dataStreamServerIterator struct {
stream *datastreamer.StreamServer
curEntryNum uint64
header uint64
}

func newDataStreamServerIterator(stream *datastreamer.StreamServer, start uint64) *dataStreamServerIterator {
return &dataStreamServerIterator{
stream: stream,
curEntryNum: start,
header: stream.GetHeader().TotalEntries - 1,
}
}

func (it *dataStreamServerIterator) NextFileEntry() (entry *types.FileEntry, err error) {
if it.curEntryNum > it.header {
return nil, nil
}

var fileEntry datastreamer.FileEntry
fileEntry, err = it.stream.GetEntry(it.curEntryNum)
if err != nil {
return nil, err
}

it.curEntryNum += 1

return &types.FileEntry{
PacketType: uint8(fileEntry.Type),
Length: fileEntry.Length,
EntryType: types.EntryType(fileEntry.Type),
EntryNum: fileEntry.Number,
Data: fileEntry.Data,
}, nil
}

func (srv *DataStreamServer) ReadBatches(start uint64, end uint64) ([][]*types.FullL2Block, error) {
bookmark := types.NewBookmarkProto(start, datastream.BookmarkType_BOOKMARK_TYPE_BATCH)
marshalled, err := bookmark.Marshal()
if err != nil {
return nil, err
}

entryNum, err := srv.stream.GetBookmark(marshalled)

if err != nil {
return nil, err
}

iterator := newDataStreamServerIterator(srv.stream, entryNum)

return ReadBatches(iterator, start, end)
}

func ReadBatches(iterator client.FileEntryIterator, start uint64, end uint64) ([][]*types.FullL2Block, error) {
batches := make([][]*types.FullL2Block, end-start+1)

LOOP_ENTRIES:
for {
parsedProto, err := client.ReadParsedProto(iterator)
if err != nil {
return nil, err
}

if parsedProto == nil {
break
}

switch parsedProto := parsedProto.(type) {
case *types.BatchStart:
batches[parsedProto.Number-start] = []*types.FullL2Block{}
case *types.BatchEnd:
if parsedProto.Number == end {
break LOOP_ENTRIES
}
case *types.FullL2Block:
batches[parsedProto.BatchNumber-start] = append(batches[parsedProto.BatchNumber-start], parsedProto)
default:
continue
}
}

return batches, nil
}
7 changes: 7 additions & 0 deletions zk/legacy_executor_verifier/legacy_executor_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ func (v *LegacyExecutorVerifier) VerifyWithoutExecutor(request *VerifierRequest)
return promise
}

func (v *LegacyExecutorVerifier) HasPendingVerifications() bool {
v.mtxPromises.Lock()
defer v.mtxPromises.Unlock()

return len(v.promises) > 0
}

func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, error) {
v.mtxPromises.Lock()
defer v.mtxPromises.Unlock()
Expand Down
Loading

0 comments on commit 292a2ec

Please sign in to comment.