Skip to content

Commit

Permalink
get block height for batchend from datastream (#1028)
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov authored and Stefan-Ethernal committed Sep 20, 2024
1 parent 9f417a7 commit 01a89a4
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 105 deletions.
4 changes: 2 additions & 2 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu
}
file, err := c.readFileEntry()
if err != nil {
return fmt.Errorf("error reading file entry: %v", err)
return fmt.Errorf("reading file entry: %v", err)
}
if err := function(file); err != nil {
return fmt.Errorf("error executing function: %v", err)
return fmt.Errorf("executing function: %v", err)

}
count++
Expand Down
181 changes: 87 additions & 94 deletions zk/debug_tools/datastream-correctness-check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/gateway-fm/cdk-erigon-lib/common"
"github.com/ledgerwatch/erigon/zk/datastream/client"
"github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ledgerwatch/erigon/zk/datastream/types"
Expand All @@ -27,123 +28,115 @@ func main() {
}

// create bookmark
bookmark := types.NewBookmarkProto(5191325, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK)
bookmark := types.NewBookmarkProto(0, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK)

// var previousFile *types.FileEntry
var previousFile *types.FileEntry
var lastBlockRoot common.Hash
progressBatch := uint64(0)
progressBlock := uint64(0)

printFunction := func(file *types.FileEntry) error {
function := func(file *types.FileEntry) error {
switch file.EntryType {
case types.EntryTypeL2Block:
l2Block, err := types.UnmarshalL2Block(file.Data)
case types.EntryTypeL2BlockEnd:
if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Block && previousFile.EntryType != types.EntryTypeL2Tx {
return fmt.Errorf("unexpected entry type before l2 block end: %v", previousFile.EntryType)
}
case types.BookmarkEntryType:
bookmark, err := types.UnmarshalBookmark(file.Data)
if err != nil {
return err
}
fmt.Println("L2Block: ", l2Block.L2BlockNumber, "batch", l2Block.BatchNumber, "stateRoot", l2Block.StateRoot.Hex())
if l2Block.L2BlockNumber > 5191335 {
return fmt.Errorf("stop")
if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH {
progressBatch = bookmark.Value
if previousFile != nil && previousFile.EntryType != types.EntryTypeBatchEnd {
return fmt.Errorf("unexpected entry type before batch bookmark type: %v, bookmark batch number: %d", previousFile.EntryType, bookmark.Value)
}
}
if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK {
progressBlock = bookmark.Value
if previousFile != nil &&
previousFile.EntryType != types.EntryTypeBatchStart &&
previousFile.EntryType != types.EntryTypeL2BlockEnd {
return fmt.Errorf("unexpected entry type before block bookmark type: %v, bookmark block number: %d", previousFile.EntryType, bookmark.Value)
}
}
case types.EntryTypeBatchStart:
batchStart, err := types.UnmarshalBatchStart(file.Data)
if err != nil {
return err
}
progressBatch = batchStart.Number
if previousFile != nil {
if previousFile.EntryType != types.BookmarkEntryType {
return fmt.Errorf("unexpected entry type before batch start: %v, batchStart Batch number: %d", previousFile.EntryType, batchStart.Number)
} else {
bookmark, err := types.UnmarshalBookmark(previousFile.Data)
if err != nil {
return err
}
if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_BATCH {
return fmt.Errorf("unexpected bookmark type before batch start: %v, batchStart Batch number: %d", bookmark.BookmarkType(), batchStart.Number)
}
}
}
case types.EntryTypeBatchEnd:
if previousFile != nil &&
previousFile.EntryType != types.EntryTypeL2BlockEnd &&
previousFile.EntryType != types.EntryTypeL2Tx &&
previousFile.EntryType != types.EntryTypeL2Block &&
previousFile.EntryType != types.EntryTypeBatchStart {
return fmt.Errorf("unexpected entry type before batch end: %v", previousFile.EntryType)
}
batchEnd, err := types.UnmarshalBatchEnd(file.Data)
if err != nil {
return err
}
fmt.Println("BatchEnd: ", batchEnd.Number, "stateRoot", batchEnd.StateRoot.Hex())
if batchEnd.Number != progressBatch {
return fmt.Errorf("batch end number mismatch: %d, expected: %d", batchEnd.Number, progressBatch)
}
if batchEnd.StateRoot != lastBlockRoot {
return fmt.Errorf("batch end state root mismatch: %x, expected: %x", batchEnd.StateRoot, lastBlockRoot)
}
case types.EntryTypeL2Tx:
if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Tx && previousFile.EntryType != types.EntryTypeL2Block {
return fmt.Errorf("unexpected entry type before l2 tx: %v", previousFile.EntryType)
}
case types.EntryTypeL2Block:
l2Block, err := types.UnmarshalL2Block(file.Data)
if err != nil {
return err
}
progressBlock = l2Block.L2BlockNumber
if previousFile != nil {
if previousFile.EntryType != types.BookmarkEntryType && !previousFile.IsL2BlockEnd() {
return fmt.Errorf("unexpected entry type before l2 block: %v, block number: %d", previousFile.EntryType, l2Block.L2BlockNumber)
} else {
bookmark, err := types.UnmarshalBookmark(previousFile.Data)
if err != nil {
return err
}
if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK {
return fmt.Errorf("unexpected bookmark type before l2 block: %v, block number: %d", bookmark.BookmarkType(), l2Block.L2BlockNumber)
}

}
}
lastBlockRoot = l2Block.StateRoot
case types.EntryTypeGerUpdate:
return nil
default:
return fmt.Errorf("unexpected entry type: %v", file.EntryType)
}

previousFile = file
return nil
}

// function := func(file *types.FileEntry) error {
// switch file.EntryType {
// case types.EntryTypeL2BlockEnd:
// if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Block && previousFile.EntryType != types.EntryTypeL2Tx {
// return fmt.Errorf("unexpected entry type before l2 block end: %v", previousFile.EntryType)
// }
// case types.BookmarkEntryType:
// bookmark, err := types.UnmarshalBookmark(file.Data)
// if err != nil {
// return err
// }
// if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH {
// progressBatch = bookmark.Value
// if previousFile != nil && previousFile.EntryType != types.EntryTypeBatchEnd {
// return fmt.Errorf("unexpected entry type before batch bookmark type: %v, bookmark batch number: %d", previousFile.EntryType, bookmark.Value)
// }
// }
// if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK {
// progressBlock = bookmark.Value
// if previousFile != nil &&
// previousFile.EntryType != types.EntryTypeBatchStart &&
// previousFile.EntryType != types.EntryTypeL2BlockEnd {
// return fmt.Errorf("unexpected entry type before block bookmark type: %v, bookmark block number: %d", previousFile.EntryType, bookmark.Value)
// }
// }
// case types.EntryTypeBatchStart:
// batchStart, err := types.UnmarshalBatchStart(file.Data)
// if err != nil {
// return err
// }
// progressBatch = batchStart.Number
// if previousFile != nil {
// if previousFile.EntryType != types.BookmarkEntryType {
// return fmt.Errorf("unexpected entry type before batch start: %v, batchStart Batch number: %d", previousFile.EntryType, batchStart.Number)
// } else {
// bookmark, err := types.UnmarshalBookmark(previousFile.Data)
// if err != nil {
// return err
// }
// if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_BATCH {
// return fmt.Errorf("unexpected bookmark type before batch start: %v, batchStart Batch number: %d", bookmark.BookmarkType(), batchStart.Number)
// }
// }
// }
// case types.EntryTypeBatchEnd:
// if previousFile != nil &&
// previousFile.EntryType != types.EntryTypeL2BlockEnd &&
// previousFile.EntryType != types.EntryTypeBatchStart {
// return fmt.Errorf("unexpected entry type before batch end: %v", previousFile.EntryType)
// }
// case types.EntryTypeL2Tx:
// if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Tx && previousFile.EntryType != types.EntryTypeL2Block {
// return fmt.Errorf("unexpected entry type before l2 tx: %v", previousFile.EntryType)
// }
// case types.EntryTypeL2Block:
// l2Block, err := types.UnmarshalL2Block(file.Data)
// if err != nil {
// return err
// }
// progressBlock = l2Block.L2BlockNumber
// if previousFile != nil {
// if previousFile.EntryType != types.BookmarkEntryType && !previousFile.IsL2BlockEnd() {
// return fmt.Errorf("unexpected entry type before l2 block: %v, block number: %d", previousFile.EntryType, l2Block.L2BlockNumber)
// } else {
// bookmark, err := types.UnmarshalBookmark(previousFile.Data)
// if err != nil {
// return err
// }
// if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK {
// return fmt.Errorf("unexpected bookmark type before l2 block: %v, block number: %d", bookmark.BookmarkType(), l2Block.L2BlockNumber)
// }

// }
// }
// case types.EntryTypeGerUpdate:
// return nil
// default:
// return fmt.Errorf("unexpected entry type: %v", file.EntryType)
// }

// previousFile = file
// return nil
// }
// send start command
err = client.ExecutePerFile(bookmark, printFunction)
err = client.ExecutePerFile(bookmark, function)
fmt.Println("progress block: ", progressBlock)
fmt.Println("progress batch: ", progressBatch)
if err != nil {
panic(fmt.Sprintf("found an error: %s", err))
panic(err)
}
}
5 changes: 5 additions & 0 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func SpawnStageBatches(
}

lastHash := emptyHash
lastBlockRoot := emptyHash
atLeastOneBlockWritten := false
startTime := time.Now()

Expand Down Expand Up @@ -306,6 +307,9 @@ LOOP:
}
}
case *types.BatchEnd:
if entry.StateRoot != lastBlockRoot {
log.Warn(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot))
}
if err := writeBatchEnd(hermezDb, entry); err != nil {
return fmt.Errorf("write batch end error: %v", err)
}
Expand Down Expand Up @@ -462,6 +466,7 @@ LOOP:
}

lastHash = entry.L2Blockhash
lastBlockRoot = entry.StateRoot

atLeastOneBlockWritten = true
lastBlockHeight = entry.L2BlockNumber
Expand Down
3 changes: 1 addition & 2 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,7 @@ func SpawnSequencingStage(
}
}

block, err = doFinishBlockAndUpdateState(batchContext, ibs, header, parentBlock, batchState, ger, l1BlockHash, l1TreeUpdateIndex, infoTreeIndexProgress, batchCounters)
if err != nil {
if block, err = doFinishBlockAndUpdateState(batchContext, ibs, header, parentBlock, batchState, ger, l1BlockHash, l1TreeUpdateIndex, infoTreeIndexProgress, batchCounters); err != nil {
return err
}

Expand Down
12 changes: 8 additions & 4 deletions zk/stages/stage_sequence_execute_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,16 @@ func runBatchLastSteps(
return err
}

lastBlock, err := batchContext.sdb.hermezDb.GetHighestBlockInBatch(thisBatch)
// get the last block number written to batch
// we should match it's state root in batch end entry
// if we get the last block in DB errors may occur since we have DB unwinds AFTER we commit batch end to datastream
// the last block written to the datastream before batch end should be the correct one once we are here
// if it is not, we have a bigger problem
lastBlockNumber, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber()
if err != nil {
return err
}
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlock)
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlockNumber)
if err != nil {
return err
}
Expand All @@ -159,8 +164,7 @@ func runBatchLastSteps(
}

// the unwind of this value is handed by UnwindExecutionStageDbWrites
_, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlock)
if err != nil {
if _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlockNumber); err != nil {
return fmt.Errorf("writing plain state version: %w", err)
}

Expand Down
9 changes: 6 additions & 3 deletions zk/stages/stage_sequence_execute_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"fmt"

"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk/datastream/server"
verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/core/vm"
)

type SequencerBatchStreamWriter struct {
Expand Down Expand Up @@ -106,6 +106,9 @@ func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, th
latestCounters := vm.NewCountersFromUsedMap(rawCounters)

endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters)
if err != nil {
return false, err
}

if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil {
return false, err
Expand Down

0 comments on commit 01a89a4

Please sign in to comment.