Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable a sequencer to re-sequence batches #894

Merged
merged 21 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0b97e62
Enable a sequencer to re-sequence batches
cffls Aug 1, 2024
e2a6df5
Stop producing blocks after resequencing completes
cffls Aug 8, 2024
1a110cf
Introduce flag reuseL1InfoIndex
cffls Aug 8, 2024
efc7748
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Aug 12, 2024
8376630
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Aug 13, 2024
8e8ea1e
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Aug 15, 2024
135a0fb
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Aug 19, 2024
407f919
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Aug 22, 2024
977de7a
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Aug 26, 2024
234144c
Move ReadParsedProto back to data stream client in order to reduce me…
cffls Aug 27, 2024
089573a
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Aug 28, 2024
a979f1b
Skip iteration when the last batch is incomplete
cffls Aug 28, 2024
37097f1
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Aug 30, 2024
c783cc9
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Sep 3, 2024
b8fb897
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Sep 5, 2024
eac6c21
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Sep 12, 2024
ed27ecc
Wait for pending verifications during resequencing
cffls Sep 13, 2024
46b2e1a
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Sep 16, 2024
2f1351b
Merge remote-tracking branch 'origin/zkevm' into resequence
cffls Sep 18, 2024
bccd57f
Address PR comments
cffls Sep 19, 2024
b69fcc2
Merge branch 'zkevm' into resequence
cffls Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
pruneTBefore, pruneCBefore uint64
experiments []string
chain string // Which chain to use (mainnet, rinkeby, goerli, etc.)
config string

commitmentMode string
commitmentTrie string
Expand All @@ -49,7 +50,7 @@ func must(err error) {
}

func withConfig(cmd *cobra.Command) {
cmd.Flags().String("config", "", "yaml/toml config file location")
cmd.Flags().StringVar(&config, "config", "", "yaml/toml config file location")
}

func withMining(cmd *cobra.Command) {
Expand Down
6 changes: 0 additions & 6 deletions cmd/integration/commands/stage_stages_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import (

common2 "github.com/gateway-fm/cdk-erigon-lib/common"
"github.com/gateway-fm/cdk-erigon-lib/kv"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
smtdb "github.com/ledgerwatch/erigon/smt/pkg/db"
erigoncli "github.com/ledgerwatch/erigon/turbo/cli"
"github.com/ledgerwatch/erigon/zk/hermez_db"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
Expand All @@ -28,9 +25,6 @@ state_stages_zkevm --datadir=/datadirs/hermez-mainnet --unwind-batch-no=2 --chai
Example: "go run ./cmd/integration state_stages_zkevm --config=... --verbosity=3 --unwind-batch-no=100",
Run: func(cmd *cobra.Command, args []string) {
ctx, _ := common2.RootContext()
ethConfig := &ethconfig.Defaults
ethConfig.Genesis = core.GenesisBlockByChainName(chain)
erigoncli.ApplyFlagsForEthConfigCobra(cmd.Flags(), ethConfig)
db := openDB(dbCfg(kv.ChainDB, chaindata), true)
defer db.Close()

Expand Down
40 changes: 39 additions & 1 deletion cmd/integration/commands/stages_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package commands

import (
"context"
"encoding/json"
"math/big"
"os"
"path"
"path/filepath"
"strings"

"github.com/c2h5oh/datasize"
chain3 "github.com/gateway-fm/cdk-erigon-lib/chain"
Expand All @@ -10,11 +16,14 @@ import (
"github.com/gateway-fm/cdk-erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/shards"
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
"github.com/ledgerwatch/erigon/zk/sequencer"
Expand All @@ -26,7 +35,36 @@ func newSyncZk(ctx context.Context, db kv.RwDB) (consensus.Engine, *vm.Config, *

vmConfig := &vm.Config{}

genesis := core.GenesisBlockByChainName(chain)
var genesis *types.Genesis

if strings.HasPrefix(chain, "dynamic") {
if config == "" {
panic("Config file is required for dynamic chain")
}

params.DynamicChainConfigPath = filepath.Dir(config)
genesis = core.GenesisBlockByChainName(chain)
filename := path.Join(params.DynamicChainConfigPath, chain+"-conf.json")

dConf := utils.DynamicConfig{}

if _, err := os.Stat(filename); err == nil {
dConfBytes, err := os.ReadFile(filename)
if err != nil {
panic(err)
}
if err := json.Unmarshal(dConfBytes, &dConf); err != nil {
panic(err)
}
}

genesis.Timestamp = dConf.Timestamp
genesis.GasLimit = dConf.GasLimit
genesis.Difficulty = big.NewInt(dConf.Difficulty)
} else {
genesis = core.GenesisBlockByChainName(chain)
}

chainConfig, genesisBlock, genesisErr := core.CommitGenesisBlock(db, genesis, "")
if _, ok := genesisErr.(*chain3.ConfigCompatError); genesisErr != nil && !ok {
panic(genesisErr)
Expand Down
15 changes: 15 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,21 @@ var (
Usage: "Halt the sequencer on this batch number",
Value: 0,
}
SequencerResequence = cli.BoolFlag{
Name: "zkevm.sequencer-resequence",
Usage: "When enabled, the sequencer will automatically resequence unseen batches stored in data stream",
Value: false,
}
SequencerResequenceStrict = cli.BoolFlag{
Name: "zkevm.sequencer-resequence-strict",
Usage: "Strictly resequence the rolledback batches",
Value: true,
}
SequencerResequenceReuseL1InfoIndex = cli.BoolFlag{
Name: "zkevm.sequencer-resequence-reuse-l1-info-index",
Usage: "Reuse the L1 info index for resequencing",
Value: true,
}
ExecutorUrls = cli.StringFlag{
Name: "zkevm.executor-urls",
Usage: "A comma separated list of grpc addresses that host executors",
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Zk struct {
SequencerBatchVerificationTimeout time.Duration
SequencerTimeoutOnEmptyTxPool time.Duration
SequencerHaltOnBatchNumber uint64
SequencerResequence bool
SequencerResequenceStrict bool
SequencerResequenceReuseL1InfoIndex bool
ExecutorUrls []string
ExecutorStrictMode bool
ExecutorRequestTimeout time.Duration
Expand Down
3 changes: 3 additions & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ var DefaultFlags = []cli.Flag{
&utils.SequencerBatchVerificationTimeout,
&utils.SequencerTimeoutOnEmptyTxPool,
&utils.SequencerHaltOnBatchNumber,
&utils.SequencerResequence,
&utils.SequencerResequenceStrict,
&utils.SequencerResequenceReuseL1InfoIndex,
&utils.ExecutorUrls,
&utils.ExecutorStrictMode,
&utils.ExecutorRequestTimeout,
Expand Down
3 changes: 3 additions & 0 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
SequencerBatchVerificationTimeout: sequencerBatchVerificationTimeout,
SequencerTimeoutOnEmptyTxPool: sequencerTimeoutOnEmptyTxPool,
SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name),
SequencerResequence: ctx.Bool(utils.SequencerResequence.Name),
SequencerResequenceStrict: ctx.Bool(utils.SequencerResequenceStrict.Name),
SequencerResequenceReuseL1InfoIndex: ctx.Bool(utils.SequencerResequenceReuseL1InfoIndex.Name),
ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","),
ExecutorStrictMode: ctx.Bool(utils.ExecutorStrictMode.Name),
ExecutorRequestTimeout: ctx.Duration(utils.ExecutorRequestTimeout.Name),
Expand Down
20 changes: 14 additions & 6 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu
if c.Header.TotalEntries == count {
break
}
file, err := c.readFileEntry()
file, err := c.NextFileEntry()
if err != nil {
return fmt.Errorf("reading file entry: %v", err)
}
Expand Down Expand Up @@ -304,7 +304,7 @@ LOOP:
c.conn.SetReadDeadline(time.Now().Add(c.checkTimeout))
}

parsedProto, localErr := c.readParsedProto()
parsedProto, localErr := ReadParsedProto(c)
if localErr != nil {
err = localErr
break
Expand Down Expand Up @@ -353,16 +353,24 @@ func (c *StreamClient) tryReConnect() error {
return err
}

func (c *StreamClient) readParsedProto() (
type FileEntryIterator interface {
NextFileEntry() (*types.FileEntry, error)
}

func ReadParsedProto(iterator FileEntryIterator) (
parsedEntry interface{},
err error,
) {
file, err := c.readFileEntry()
file, err := iterator.NextFileEntry()
if err != nil {
err = fmt.Errorf("read file entry error: %v", err)
return
}

if file == nil {
return nil, nil
}

switch file.EntryType {
case types.BookmarkEntryType:
parsedEntry, err = types.UnmarshalBookmark(file.Data)
Expand All @@ -384,7 +392,7 @@ func (c *StreamClient) readParsedProto() (
var l2Tx *types.L2TransactionProto
LOOP:
for {
if innerFile, err = c.readFileEntry(); err != nil {
if innerFile, err = iterator.NextFileEntry(); err != nil {
return
}

Expand Down Expand Up @@ -438,7 +446,7 @@ func (c *StreamClient) readParsedProto() (

// reads file bytes from socket and tries to parse them
// returns the parsed FileEntry
func (c *StreamClient) readFileEntry() (file *types.FileEntry, err error) {
func (c *StreamClient) NextFileEntry() (file *types.FileEntry, err error) {
// Read packet type
packet, err := readBuffer(c.conn, 1)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func Test_readFileEntry(t *testing.T) {
server.Close()
}()

result, err := c.readFileEntry()
result, err := c.NextFileEntry()
require.Equal(t, testCase.expectedError, err)
assert.DeepEqual(t, testCase.expectedResult, result)
})
Expand Down
86 changes: 86 additions & 0 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/gateway-fm/cdk-erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
eritypes "github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/zk/datastream/client"
"github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ledgerwatch/erigon/zk/datastream/types"
)
Expand Down Expand Up @@ -599,3 +600,88 @@ func (srv *DataStreamServer) getLastEntryOfType(entryType datastreamer.EntryType

return emtryEntry, false, nil
}

type dataStreamServerIterator struct {
stream *datastreamer.StreamServer
curEntryNum uint64
header uint64
}

func newDataStreamServerIterator(stream *datastreamer.StreamServer, start uint64) *dataStreamServerIterator {
return &dataStreamServerIterator{
stream: stream,
curEntryNum: start,
header: stream.GetHeader().TotalEntries - 1,
}
}

func (it *dataStreamServerIterator) NextFileEntry() (entry *types.FileEntry, err error) {
if it.curEntryNum > it.header {
return nil, nil
}

var fileEntry datastreamer.FileEntry
fileEntry, err = it.stream.GetEntry(it.curEntryNum)
if err != nil {
return nil, err
}

it.curEntryNum += 1

return &types.FileEntry{
PacketType: uint8(fileEntry.Type),
Length: fileEntry.Length,
EntryType: types.EntryType(fileEntry.Type),
EntryNum: fileEntry.Number,
Data: fileEntry.Data,
}, nil
}

func (srv *DataStreamServer) ReadBatches(start uint64, end uint64) ([][]*types.FullL2Block, error) {
bookmark := types.NewBookmarkProto(start, datastream.BookmarkType_BOOKMARK_TYPE_BATCH)
marshalled, err := bookmark.Marshal()
if err != nil {
return nil, err
}

entryNum, err := srv.stream.GetBookmark(marshalled)

if err != nil {
return nil, err
}

iterator := newDataStreamServerIterator(srv.stream, entryNum)

return ReadBatches(iterator, start, end)
}

func ReadBatches(iterator client.FileEntryIterator, start uint64, end uint64) ([][]*types.FullL2Block, error) {
batches := make([][]*types.FullL2Block, end-start+1)

LOOP_ENTRIES:
for {
parsedProto, err := client.ReadParsedProto(iterator)
if err != nil {
return nil, err
}

if parsedProto == nil {
break
}

switch parsedProto := parsedProto.(type) {
case *types.BatchStart:
batches[parsedProto.Number-start] = []*types.FullL2Block{}
case *types.BatchEnd:
if parsedProto.Number == end {
break LOOP_ENTRIES
}
case *types.FullL2Block:
batches[parsedProto.BatchNumber-start] = append(batches[parsedProto.BatchNumber-start], parsedProto)
default:
continue
}
}

return batches, nil
}
7 changes: 7 additions & 0 deletions zk/legacy_executor_verifier/legacy_executor_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ func (v *LegacyExecutorVerifier) VerifyWithoutExecutor(request *VerifierRequest)
return promise
}

func (v *LegacyExecutorVerifier) HasPendingVerifications() bool {
v.mtxPromises.Lock()
defer v.mtxPromises.Unlock()

return len(v.promises) > 0
}

func (v *LegacyExecutorVerifier) ProcessResultsSequentially(logPrefix string) ([]*VerifierBundle, error) {
v.mtxPromises.Lock()
defer v.mtxPromises.Unlock()
Expand Down
Loading
Loading