Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CodecV7 #1583

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rollup/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ require (
github.com/holiman/uint256 v1.2.4
github.com/mitchellh/mapstructure v1.5.0
github.com/prometheus/client_golang v1.16.0
github.com/scroll-tech/da-codec v0.1.2
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd
github.com/scroll-tech/go-ethereum v1.10.14-0.20241210104312-bdf64cfb39dc
github.com/smartystreets/goconvey v1.8.0
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
Expand Down
3 changes: 3 additions & 0 deletions rollup/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,11 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/scroll-tech/da-codec v0.1.2 h1:QyJ+dQ4zWVVJwuqxNt4MiKyrymVc6rHe4YPtURkjiRc=
github.com/scroll-tech/da-codec v0.1.2/go.mod h1:odz1ck3umvYccCG03osaQBISAYGinZktZYbpk94fYRE=
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd h1:Yq3vc5e9VqXKBEGAqpptPhviXlydoL3R8xX8R8C6YvY=
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d h1:vuv7fGKEDtoeetI6RkKt8RAByJsYZBWk9Vo6gShv65c=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d/go.mod h1:PWEOTg6LeWlJAlFJauO0msSLXWnpHmE+mVh5txtfeRM=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241210104312-bdf64cfb39dc/go.mod h1:xRDJvaNUe7lCU2fB+AqyS7gahar+dfJPrUJplfXF4dw=
github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE=
github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
Expand Down
12 changes: 12 additions & 0 deletions rollup/internal/config/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ type SenderConfig struct {
TxType string `json:"tx_type"`
// The maximum number of pending blob-carrying transactions
MaxPendingBlobTxs int64 `json:"max_pending_blob_txs"`

// Config for batch submission
BatchSubmission *BatchSubmission `json:"batch_submission"`
}

type BatchSubmission struct {
// The minimum number of batches to submit in a single transaction.
MinBatches int `json:"min_batches"`
// The maximum number of batches to submit in a single transaction.
MaxBatches int `json:"max_batches"`
// The time in seconds after which a batch is considered stale and should be submitted ignoring the min batch count.
TimeoutSec int64 `json:"timeout"`
}

// ChainMonitor this config is used to get batch status from chain_monitor API.
Expand Down
207 changes: 205 additions & 2 deletions rollup/internal/controller/relayer/l2_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,156 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() {
// ProcessPendingBatches processes the pending batches by sending commitBatch transactions to layer 1.
func (r *Layer2Relayer) ProcessPendingBatches() {
// get pending batches from database in ascending order by their index.
dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, 5)
dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, max(5, r.cfg.SenderConfig.BatchSubmission.MaxBatches))
if err != nil {
log.Error("Failed to fetch pending L2 batches", "err", err)
return
}

var batchesToSubmit []*dbBatchWithChunksAndParent
var forceSubmit bool
for i, dbBatch := range dbBatches {
if i == 0 && encoding.CodecVersion(dbBatch.CodecVersion) < encoding.CodecV6 {
// if the first batch is not >= V6 then we need to submit batches one by one
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved
r.processPendingBatchesV4(dbBatches)
return
}

batchesToSubmitLen := len(batchesToSubmit)
var dbChunks []*orm.Chunk
var dbParentBatch *orm.Batch

// Verify batches compatibility
{
dbChunks, err = r.chunkOrm.GetChunksInRange(r.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex)
if err != nil {
log.Error("failed to get chunks in range", "err", err)
return
}

// check codec version
for _, dbChunk := range dbChunks {
if dbBatch.CodecVersion != dbChunk.CodecVersion {
log.Error("batch codec version is different from chunk codec version", "batch index", dbBatch.Index, "chunk index", dbChunk.Index, "batch codec version", dbBatch.CodecVersion, "chunk codec version", dbChunk.CodecVersion)
return
}
}

if dbBatch.Index == 0 {
log.Error("invalid args: batch index is 0, should only happen in committing genesis batch")
return
}

dbParentBatch, err = r.batchOrm.GetBatchByIndex(r.ctx, dbBatch.Index-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these batches are already in memory (dbBatches).

if err != nil {
log.Error("failed to get parent batch header", "err", err)
return
}

if dbParentBatch.CodecVersion > dbBatch.CodecVersion {
log.Error("parent batch codec version is greater than current batch codec version", "index", dbBatch.Index, "hash", dbBatch.Hash, "parent codec version", dbParentBatch.CodecVersion, "current codec version", dbBatch.CodecVersion)
return
}

// make sure we commit batches of the same codec version together.
// If we encounter a batch with a different codec version, we stop here and will commit the batches we have so far.
// The next call of ProcessPendingBatches will then start with the batch with the different codec version.
if batchesToSubmitLen > 0 && batchesToSubmit[batchesToSubmitLen-1].Batch.CodecVersion != dbBatch.CodecVersion {
break
}
}

// if one of the batches is too old, we force submit all batches that we have so far in the next step
if !forceSubmit && time.Since(dbBatch.CreatedAt) > time.Duration(r.cfg.SenderConfig.BatchSubmission.TimeoutSec)*time.Second {
forceSubmit = true
}

if batchesToSubmitLen <= r.cfg.SenderConfig.BatchSubmission.MaxBatches {
batchesToSubmit = append(batchesToSubmit, &dbBatchWithChunksAndParent{
Batch: dbBatch,
Chunks: dbChunks,
ParentBatch: dbParentBatch,
})
}
}

if !forceSubmit && len(batchesToSubmit) < r.cfg.SenderConfig.BatchSubmission.MinBatches {
log.Debug("Not enough batches to submit", "count", len(batchesToSubmit), "minBatches", r.cfg.SenderConfig.BatchSubmission.MinBatches, "maxBatches", r.cfg.SenderConfig.BatchSubmission.MaxBatches)
return
}

if forceSubmit {
log.Info("Forcing submission of batches due to timeout", "batch index", batchesToSubmit[0].Batch.Index, "created at", batchesToSubmit[0].Batch.CreatedAt)
}

// We have at least 1 batch to commit
firstBatch := batchesToSubmit[0].Batch
lastBatch := batchesToSubmit[len(batchesToSubmit)-1].Batch

codecVersion := encoding.CodecVersion(firstBatch.CodecVersion)
switch codecVersion {
case encoding.CodecV6:
calldata, blob, err := r.constructCommitBatchPayloadCodecV6(batchesToSubmit)
if err != nil {
log.Error("failed to construct commitBatchWithBlobProof payload for V6", "codecVersion", codecVersion, "start index", firstBatch.Index, "end index", lastBatch.Index, "err", err)
return
}
default:
log.Error("unsupported codec version in ProcessPendingBatches", "codecVersion", codecVersion, "start index", firstBatch, "end index", lastBatch.Index)
return
}

//txHash, err := r.commitSender.SendTransaction(dbBatch.Hash, &r.cfg.RollupContractAddress, calldata, blob, 0)
//if err != nil {
// if errors.Is(err, sender.ErrTooManyPendingBlobTxs) {
// r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc()
// log.Debug(
// "Skipped sending commitBatch tx to L1: too many pending blob txs",
// "maxPending", r.cfg.SenderConfig.MaxPendingBlobTxs,
// "err", err,
// )
// return
// }
// log.Error(
// "Failed to send commitBatch tx to layer1",
// "index", dbBatch.Index,
// "hash", dbBatch.Hash,
// "RollupContractAddress", r.cfg.RollupContractAddress,
// "err", err,
// "calldata", common.Bytes2Hex(calldata),
// )
// return
//}
//
//err = r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, dbBatch.Hash, txHash.String(), types.RollupCommitting)
//if err != nil {
// log.Error("UpdateCommitTxHashAndRollupStatus failed", "hash", dbBatch.Hash, "index", dbBatch.Index, "err", err)
// return
//}
//
//var maxBlockHeight uint64
//var totalGasUsed uint64
//for _, dbChunk := range dbChunks {
// if dbChunk.EndBlockNumber > maxBlockHeight {
// maxBlockHeight = dbChunk.EndBlockNumber
// }
// totalGasUsed += dbChunk.TotalL2TxGas
//}
//r.metrics.rollupL2RelayerCommitBlockHeight.Set(float64(maxBlockHeight))
//r.metrics.rollupL2RelayerCommitThroughput.Add(float64(totalGasUsed))
//
//r.metrics.rollupL2RelayerProcessPendingBatchSuccessTotal.Inc()
//log.Info("Sent the commitBatch tx to layer1", "batch index", dbBatch.Index, "batch hash", dbBatch.Hash, "tx hash", txHash.String())

}

type dbBatchWithChunksAndParent struct {
Batch *orm.Batch
Chunks []*orm.Chunk
ParentBatch *orm.Batch
}

func (r *Layer2Relayer) processPendingBatchesV4(dbBatches []*orm.Batch) {
for _, dbBatch := range dbBatches {
r.metrics.rollupL2RelayerProcessPendingBatchTotal.Inc()

Expand Down Expand Up @@ -432,7 +577,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
return
}
default:
log.Error("unsupported codec version", "codecVersion", codecVersion)
log.Error("unsupported codec version in processPendingBatchesV4", "codecVersion", codecVersion)
return
}

Expand Down Expand Up @@ -868,6 +1013,64 @@ func (r *Layer2Relayer) constructCommitBatchPayloadCodecV4(dbBatch *orm.Batch, d
return calldata, daBatch.Blob(), nil
}

func (r *Layer2Relayer) constructCommitBatchPayloadCodecV6(batchesToSubmit []*dbBatchWithChunksAndParent) ([]byte, []*kzg4844.Blob, error) {
blobs := make([]*kzg4844.Blob, len(batchesToSubmit))

version := encoding.CodecVersion(batchesToSubmit[0].Batch.CodecVersion)
var firstParentBatch *orm.Batch
// construct blobs
for _, b := range batchesToSubmit {
// double check that all batches have the same version
batchVersion := encoding.CodecVersion(b.Batch.CodecVersion)
if batchVersion != version {
return nil, nil, fmt.Errorf("codec version mismatch, expected: %d, got: %d for batches %d and %d", version, batchVersion, batchesToSubmit[0].Batch.Index, b.Batch.Index)
}

if firstParentBatch == nil {
firstParentBatch = b.ParentBatch
}

chunks := make([]*encoding.Chunk, len(b.Chunks))
for i, c := range b.Chunks {
blocks, err := r.l2BlockOrm.GetL2BlocksInRange(r.ctx, c.StartBlockNumber, c.EndBlockNumber)
if err != nil {
return nil, nil, fmt.Errorf("failed to get blocks in range for batch %d: %w", b.Batch.Index, err)
}
chunks[i] = &encoding.Chunk{Blocks: blocks}
}

encodingBatch := &encoding.Batch{
Index: b.Batch.Index,
TotalL1MessagePoppedBefore: b.Chunks[0].TotalL1MessagesPoppedBefore,
ParentBatchHash: common.HexToHash(b.ParentBatch.Hash),
Chunks: chunks,
}

codec, err := encoding.CodecFromVersion(version)
if err != nil {
return nil, nil, fmt.Errorf("failed to get codec from version %d, err: %w", b.Batch.CodecVersion, err)
}

daBatch, err := codec.NewDABatch(encodingBatch)
if err != nil {
return nil, nil, fmt.Errorf("failed to create DA batch: %w", err)
}

blobs = append(blobs, daBatch.Blob())
}

if firstParentBatch == nil {
return nil, nil, fmt.Errorf("firstParentBatch is nil")
}

// TODO: this needs to be updated once the contract interface is finalized
calldata, err := r.l1RollupABI.Pack("commitBatches", version, firstParentBatch.BatchHeader)
if err != nil {
return nil, nil, fmt.Errorf("failed to pack commitBatchWithBlobProof: %w", err)
}
return calldata, blobs, nil
}

func (r *Layer2Relayer) constructFinalizeBundlePayloadCodecV4(dbBatch *orm.Batch, aggProof *message.BundleProof) ([]byte, error) {
if aggProof != nil { // finalizeBundle with proof.
calldata, packErr := r.l1RollupABI.Pack(
Expand Down