Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CodecV7 #1583

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rollup/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ require (
github.com/holiman/uint256 v1.2.4
github.com/mitchellh/mapstructure v1.5.0
github.com/prometheus/client_golang v1.16.0
github.com/scroll-tech/da-codec v0.1.2
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd
github.com/scroll-tech/go-ethereum v1.10.14-0.20241210104312-bdf64cfb39dc
github.com/smartystreets/goconvey v1.8.0
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
Expand Down
3 changes: 3 additions & 0 deletions rollup/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,11 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/scroll-tech/da-codec v0.1.2 h1:QyJ+dQ4zWVVJwuqxNt4MiKyrymVc6rHe4YPtURkjiRc=
github.com/scroll-tech/da-codec v0.1.2/go.mod h1:odz1ck3umvYccCG03osaQBISAYGinZktZYbpk94fYRE=
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd h1:Yq3vc5e9VqXKBEGAqpptPhviXlydoL3R8xX8R8C6YvY=
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d h1:vuv7fGKEDtoeetI6RkKt8RAByJsYZBWk9Vo6gShv65c=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d/go.mod h1:PWEOTg6LeWlJAlFJauO0msSLXWnpHmE+mVh5txtfeRM=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241210104312-bdf64cfb39dc/go.mod h1:xRDJvaNUe7lCU2fB+AqyS7gahar+dfJPrUJplfXF4dw=
github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE=
github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
Expand Down
12 changes: 12 additions & 0 deletions rollup/internal/config/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ type SenderConfig struct {
TxType string `json:"tx_type"`
// The maximum number of pending blob-carrying transactions
MaxPendingBlobTxs int64 `json:"max_pending_blob_txs"`

// Config for batch submission
BatchSubmission *BatchSubmission `json:"batch_submission"`
}

type BatchSubmission struct {
// The minimum number of batches to submit in a single transaction.
MinBatches int `json:"min_batches"`
// The maximum number of batches to submit in a single transaction.
MaxBatches int `json:"max_batches"`
// The time in seconds after which a batch is considered stale and should be submitted ignoring the min batch count.
TimeoutSec int64 `json:"timeout"`
}

// ChainMonitor this config is used to get batch status from chain_monitor API.
Expand Down
252 changes: 246 additions & 6 deletions rollup/internal/controller/relayer/l2_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,178 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() {
// ProcessPendingBatches processes the pending batches by sending commitBatch transactions to layer 1.
func (r *Layer2Relayer) ProcessPendingBatches() {
// get pending batches from database in ascending order by their index.
dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, 5)
dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, max(5, r.cfg.SenderConfig.BatchSubmission.MaxBatches))
if err != nil {
log.Error("Failed to fetch pending L2 batches", "err", err)
return
}

var batchesToSubmit []*dbBatchWithChunksAndParent
var forceSubmit bool
for i, dbBatch := range dbBatches {
if i == 0 && encoding.CodecVersion(dbBatch.CodecVersion) < encoding.CodecV6 {
// if the first batch is not >= V6 then we need to submit batches one by one
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved
r.processPendingBatchesV4(dbBatches)
return
}

batchesToSubmitLen := len(batchesToSubmit)
var dbChunks []*orm.Chunk
var dbParentBatch *orm.Batch

// Verify batches compatibility
{
dbChunks, err = r.chunkOrm.GetChunksInRange(r.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex)
if err != nil {
log.Error("failed to get chunks in range", "err", err)
return
}

// check codec version
for _, dbChunk := range dbChunks {
if dbBatch.CodecVersion != dbChunk.CodecVersion {
log.Error("batch codec version is different from chunk codec version", "batch index", dbBatch.Index, "chunk index", dbChunk.Index, "batch codec version", dbBatch.CodecVersion, "chunk codec version", dbChunk.CodecVersion)
return
}
}

if dbBatch.Index == 0 {
log.Error("invalid args: batch index is 0, should only happen in committing genesis batch")
return
}

dbParentBatch, err = r.batchOrm.GetBatchByIndex(r.ctx, dbBatch.Index-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these batches are already in memory (dbBatches).

if err != nil {
log.Error("failed to get parent batch header", "err", err)
return
}

if dbParentBatch.CodecVersion > dbBatch.CodecVersion {
log.Error("parent batch codec version is greater than current batch codec version", "index", dbBatch.Index, "hash", dbBatch.Hash, "parent codec version", dbParentBatch.CodecVersion, "current codec version", dbBatch.CodecVersion)
return
}

// make sure we commit batches of the same codec version together.
// If we encounter a batch with a different codec version, we stop here and will commit the batches we have so far.
// The next call of ProcessPendingBatches will then start with the batch with the different codec version.
if batchesToSubmitLen > 0 && batchesToSubmit[batchesToSubmitLen-1].Batch.CodecVersion != dbBatch.CodecVersion {
break
}
}

// if one of the batches is too old, we force submit all batches that we have so far in the next step
if !forceSubmit && time.Since(dbBatch.CreatedAt) > time.Duration(r.cfg.SenderConfig.BatchSubmission.TimeoutSec)*time.Second {
forceSubmit = true
}

if batchesToSubmitLen <= r.cfg.SenderConfig.BatchSubmission.MaxBatches {
batchesToSubmit = append(batchesToSubmit, &dbBatchWithChunksAndParent{
Batch: dbBatch,
Chunks: dbChunks,
ParentBatch: dbParentBatch,
})
}
}

if !forceSubmit && len(batchesToSubmit) < r.cfg.SenderConfig.BatchSubmission.MinBatches {
log.Debug("Not enough batches to submit", "count", len(batchesToSubmit), "minBatches", r.cfg.SenderConfig.BatchSubmission.MinBatches, "maxBatches", r.cfg.SenderConfig.BatchSubmission.MaxBatches)
return
}

if forceSubmit {
log.Info("Forcing submission of batches due to timeout", "batch index", batchesToSubmit[0].Batch.Index, "created at", batchesToSubmit[0].Batch.CreatedAt)
}

// We have at least 1 batch to commit
firstBatch := batchesToSubmit[0].Batch
lastBatch := batchesToSubmit[len(batchesToSubmit)-1].Batch

var calldata []byte
var blobs []*kzg4844.Blob
var maxBlockHeight uint64
var totalGasUsed uint64

codecVersion := encoding.CodecVersion(firstBatch.CodecVersion)
switch codecVersion {
case encoding.CodecV6:
calldata, blobs, maxBlockHeight, totalGasUsed, err = r.constructCommitBatchPayloadCodecV6(batchesToSubmit)
if err != nil {
log.Error("failed to construct commitBatchWithBlobProof payload for V6", "codecVersion", codecVersion, "start index", firstBatch.Index, "end index", lastBatch.Index, "err", err)
return
}
default:
log.Error("unsupported codec version in ProcessPendingBatches", "codecVersion", codecVersion, "start index", firstBatch, "end index", lastBatch.Index)
return
}

txHash, err := r.commitSender.SendTransaction(r.contextIDFromBatches(batchesToSubmit), &r.cfg.RollupContractAddress, calldata, blobs, 0)
if err != nil {
if errors.Is(err, sender.ErrTooManyPendingBlobTxs) {
r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc()
log.Debug(
"Skipped sending commitBatch tx to L1: too many pending blob txs",
"maxPending", r.cfg.SenderConfig.MaxPendingBlobTxs,
"err", err,
)
return
}
log.Error(
"Failed to send commitBatch tx to layer1",
"start index", firstBatch.Index,
"start hash", firstBatch.Hash,
"end index", lastBatch.Index,
"end hash", lastBatch.Hash,
"RollupContractAddress", r.cfg.RollupContractAddress,
"err", err,
"calldata", common.Bytes2Hex(calldata),
)
return
}

if err = r.db.Transaction(func(dbTX *gorm.DB) error {
for _, dbBatch := range batchesToSubmit {
if err = r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, dbBatch.Batch.Hash, txHash.String(), types.RollupCommitting, dbTX); err != nil {
return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed for batch %d: %s, err %v", dbBatch.Batch.Index, dbBatch.Batch.Hash, err)
}
}

return nil
}); err != nil {
log.Error("failed to update status for batches to RollupCommitting", "err", err)
}

r.metrics.rollupL2RelayerCommitBlockHeight.Set(float64(maxBlockHeight))
r.metrics.rollupL2RelayerCommitThroughput.Add(float64(totalGasUsed))
r.metrics.rollupL2RelayerProcessPendingBatchSuccessTotal.Add(float64(len(batchesToSubmit)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might also want to monitor the time series of len(batchesToSubmit) (to see average submission).


log.Info("Sent the commitBatches tx to layer1", "batches count", len(batchesToSubmit), "start index", firstBatch.Index, "start hash", firstBatch.Hash, "end index", lastBatch.Index, "end hash", lastBatch.Hash, "tx hash", txHash.String())
}

func (r *Layer2Relayer) contextIDFromBatches(batches []*dbBatchWithChunksAndParent) string {
contextIDs := []string{"v6"}

for _, batch := range batches {
contextIDs = append(contextIDs, batch.Batch.Hash)
}

return strings.Join(contextIDs, "-")
}

func (r *Layer2Relayer) batchHashesFromContextID(contextID string) []string {
if strings.HasPrefix(contextID, "v6-") {
return strings.Split(contextID, "-")[1:]
}

return []string{contextID}
}

type dbBatchWithChunksAndParent struct {
Batch *orm.Batch
Chunks []*orm.Chunk
ParentBatch *orm.Batch
}

func (r *Layer2Relayer) processPendingBatchesV4(dbBatches []*orm.Batch) {
for _, dbBatch := range dbBatches {
r.metrics.rollupL2RelayerProcessPendingBatchTotal.Inc()

Expand Down Expand Up @@ -432,7 +599,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
return
}
default:
log.Error("unsupported codec version", "codecVersion", codecVersion)
log.Error("unsupported codec version in processPendingBatchesV4", "codecVersion", codecVersion)
return
}

Expand All @@ -444,7 +611,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
log.Warn("Batch commit previously failed, using eth_estimateGas for the re-submission", "hash", dbBatch.Hash)
}

txHash, err := r.commitSender.SendTransaction(dbBatch.Hash, &r.cfg.RollupContractAddress, calldata, blob, fallbackGasLimit)
txHash, err := r.commitSender.SendTransaction(dbBatch.Hash, &r.cfg.RollupContractAddress, calldata, []*kzg4844.Blob{blob}, fallbackGasLimit)
if err != nil {
if errors.Is(err, sender.ErrTooManyPendingBlobTxs) {
r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc()
Expand Down Expand Up @@ -730,9 +897,17 @@ func (r *Layer2Relayer) handleConfirmation(cfm *sender.Confirmation) {
log.Warn("CommitBatchTxType transaction confirmed but failed in layer1", "confirmation", cfm)
}

err := r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, cfm.ContextID, cfm.TxHash.String(), status)
if err != nil {
log.Warn("UpdateCommitTxHashAndRollupStatus failed", "confirmation", cfm, "err", err)
batchHashes := r.batchHashesFromContextID(cfm.ContextID)
if err := r.db.Transaction(func(dbTX *gorm.DB) error {
for _, batchHash := range batchHashes {
if err := r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHash, cfm.TxHash.String(), status, dbTX); err != nil {
return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed, batchHash: %s, txHash: %s, status: %s, err: %w", batchHash, cfm.TxHash.String(), status, err)
}
}

return nil
}); err != nil {
log.Warn("failed to update confirmation status for batches", "confirmation", cfm, "err", err)
}
case types.SenderTypeFinalizeBatch:
if strings.HasPrefix(cfm.ContextID, "finalizeBundle-") {
Expand Down Expand Up @@ -868,6 +1043,71 @@ func (r *Layer2Relayer) constructCommitBatchPayloadCodecV4(dbBatch *orm.Batch, d
return calldata, daBatch.Blob(), nil
}

func (r *Layer2Relayer) constructCommitBatchPayloadCodecV6(batchesToSubmit []*dbBatchWithChunksAndParent) ([]byte, []*kzg4844.Blob, uint64, uint64, error) {
var maxBlockHeight uint64
var totalGasUsed uint64
blobs := make([]*kzg4844.Blob, len(batchesToSubmit))

version := encoding.CodecVersion(batchesToSubmit[0].Batch.CodecVersion)
var firstParentBatch *orm.Batch
// construct blobs
for _, b := range batchesToSubmit {
// double check that all batches have the same version
batchVersion := encoding.CodecVersion(b.Batch.CodecVersion)
if batchVersion != version {
return nil, nil, 0, 0, fmt.Errorf("codec version mismatch, expected: %d, got: %d for batches %d and %d", version, batchVersion, batchesToSubmit[0].Batch.Index, b.Batch.Index)
}

if firstParentBatch == nil {
firstParentBatch = b.ParentBatch
}

chunks := make([]*encoding.Chunk, len(b.Chunks))
for i, c := range b.Chunks {
blocks, err := r.l2BlockOrm.GetL2BlocksInRange(r.ctx, c.StartBlockNumber, c.EndBlockNumber)
if err != nil {
return nil, nil, 0, 0, fmt.Errorf("failed to get blocks in range for batch %d: %w", b.Batch.Index, err)
}
chunks[i] = &encoding.Chunk{Blocks: blocks}

if c.EndBlockNumber > maxBlockHeight {
maxBlockHeight = c.EndBlockNumber
}
totalGasUsed += c.TotalL2TxGas
}

encodingBatch := &encoding.Batch{
Index: b.Batch.Index,
TotalL1MessagePoppedBefore: b.Chunks[0].TotalL1MessagesPoppedBefore,
ParentBatchHash: common.HexToHash(b.ParentBatch.Hash),
Chunks: chunks,
}

codec, err := encoding.CodecFromVersion(version)
if err != nil {
return nil, nil, 0, 0, fmt.Errorf("failed to get codec from version %d, err: %w", b.Batch.CodecVersion, err)
}

daBatch, err := codec.NewDABatch(encodingBatch)
if err != nil {
return nil, nil, 0, 0, fmt.Errorf("failed to create DA batch: %w", err)
}

blobs = append(blobs, daBatch.Blob())
}

if firstParentBatch == nil {
return nil, nil, 0, 0, fmt.Errorf("firstParentBatch is nil")
}

// TODO: this needs to be updated once the contract interface is finalized
calldata, err := r.l1RollupABI.Pack("commitBatches", version, firstParentBatch.BatchHeader)
if err != nil {
return nil, nil, 0, 0, fmt.Errorf("failed to pack commitBatchWithBlobProof: %w", err)
}
return calldata, blobs, maxBlockHeight, totalGasUsed, nil
}

func (r *Layer2Relayer) constructFinalizeBundlePayloadCodecV4(dbBatch *orm.Batch, aggProof *message.BundleProof) ([]byte, error) {
if aggProof != nil { // finalizeBundle with proof.
calldata, packErr := r.l1RollupABI.Pack(
Expand Down
18 changes: 11 additions & 7 deletions rollup/internal/controller/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,15 @@ func (s *Sender) getFeeData(target *common.Address, data []byte, sidecar *gethTy
}

// SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(contextID string, target *common.Address, data []byte, blob *kzg4844.Blob, fallbackGasLimit uint64) (common.Hash, error) {
func (s *Sender) SendTransaction(contextID string, target *common.Address, data []byte, blobs []*kzg4844.Blob, fallbackGasLimit uint64) (common.Hash, error) {
s.metrics.sendTransactionTotal.WithLabelValues(s.service, s.name).Inc()
var (
feeData *FeeData
sidecar *gethTypes.BlobTxSidecar
err error
)

if blob != nil {
if blobs != nil {
// check that number of pending blob-carrying txs is not too big
if s.senderType == types.SenderTypeCommitBatch {
var numPendingTransactions int64
Expand All @@ -197,7 +197,7 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data
}

}
sidecar, err = makeSidecar(blob)
sidecar, err = makeSidecar(blobs)
if err != nil {
log.Error("failed to make sidecar for blob transaction", "error", err)
return common.Hash{}, fmt.Errorf("failed to make sidecar for blob transaction, err: %w", err)
Expand Down Expand Up @@ -681,12 +681,16 @@ func (s *Sender) getBlockNumberAndBaseFeeAndBlobFee(ctx context.Context) (uint64
return header.Number.Uint64() - 1, baseFee, blobBaseFee, nil
}

func makeSidecar(blob *kzg4844.Blob) (*gethTypes.BlobTxSidecar, error) {
if blob == nil {
return nil, errors.New("blob cannot be nil")
func makeSidecar(blobsInput []*kzg4844.Blob) (*gethTypes.BlobTxSidecar, error) {
if blobsInput == nil {
return nil, errors.New("blobs cannot be nil")
}

blobs := make([]kzg4844.Blob, len(blobsInput))
for i, blob := range blobsInput {
blobs[i] = *blob
}

blobs := []kzg4844.Blob{*blob}
var commitments []kzg4844.Commitment
var proofs []kzg4844.Proof

Expand Down
Loading