diff --git a/rollup/go.mod b/rollup/go.mod index a46cb5aa3..ba1f50f2b 100644 --- a/rollup/go.mod +++ b/rollup/go.mod @@ -13,7 +13,7 @@ require ( github.com/holiman/uint256 v1.3.2 github.com/mitchellh/mapstructure v1.5.0 github.com/prometheus/client_golang v1.16.0 - github.com/scroll-tech/da-codec v0.1.3-0.20250110130755-bc9cd3c73290 + github.com/scroll-tech/da-codec v0.1.3-0.20250128015324-64133efc3843 github.com/scroll-tech/go-ethereum v1.10.14-0.20250103082839-ea3ec93d8c1e github.com/smartystreets/goconvey v1.8.0 github.com/spf13/viper v1.19.0 diff --git a/rollup/go.sum b/rollup/go.sum index 2c1a7ef3f..eed37ebcf 100644 --- a/rollup/go.sum +++ b/rollup/go.sum @@ -249,12 +249,8 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= -github.com/scroll-tech/da-codec v0.1.3-0.20250107140325-1de9ee026d5e h1:ko29Qa/pu4XCR8XaRW490vQ8A8NhSSH+CQ8ELIq2qIU= -github.com/scroll-tech/da-codec v0.1.3-0.20250107140325-1de9ee026d5e/go.mod h1:+ho/ItbapMf2xM9YUv/SfGFg3AJx9lBD7ksR7T0ZVmM= -github.com/scroll-tech/da-codec v0.1.3-0.20250110130139-d2ba6261f55c h1:ivM9jYf0C/ppyqsxKxshfrAQXBm4Iz7j5ojd38/Y9+k= -github.com/scroll-tech/da-codec v0.1.3-0.20250110130139-d2ba6261f55c/go.mod h1:+ho/ItbapMf2xM9YUv/SfGFg3AJx9lBD7ksR7T0ZVmM= -github.com/scroll-tech/da-codec v0.1.3-0.20250110130755-bc9cd3c73290 h1:2f6h0+UwXpIZnjsSK/5/AGIwBsBl7FEyXQWMLv55ZG8= -github.com/scroll-tech/da-codec v0.1.3-0.20250110130755-bc9cd3c73290/go.mod h1:+ho/ItbapMf2xM9YUv/SfGFg3AJx9lBD7ksR7T0ZVmM= +github.com/scroll-tech/da-codec v0.1.3-0.20250128015324-64133efc3843 h1:Qd6bh5Cn5hqf/yyz4ucq4Z2qIStH4KUYsPNK/zfaAb8= +github.com/scroll-tech/da-codec v0.1.3-0.20250128015324-64133efc3843/go.mod h1:+ho/ItbapMf2xM9YUv/SfGFg3AJx9lBD7ksR7T0ZVmM= github.com/scroll-tech/go-ethereum v1.10.14-0.20250103082839-ea3ec93d8c1e h1:g8jtcGiHbjWYh/V7O245IDho3WfQT4CwEpBV+MhYDrg= github.com/scroll-tech/go-ethereum v1.10.14-0.20250103082839-ea3ec93d8c1e/go.mod h1:Ik3OBLl7cJxPC+CFyCBYNXBPek4wpdzkWehn/y5qLM8= github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE= diff --git a/rollup/internal/config/relayer.go b/rollup/internal/config/relayer.go index f76010890..3e2e45c5c 100644 --- a/rollup/internal/config/relayer.go +++ b/rollup/internal/config/relayer.go @@ -29,6 +29,18 @@ type SenderConfig struct { TxType string `json:"tx_type"` // The maximum number of pending blob-carrying transactions MaxPendingBlobTxs int64 `json:"max_pending_blob_txs"` + + // Config for batch submission + BatchSubmission *BatchSubmission `json:"batch_submission"` +} + +type BatchSubmission struct { + // The minimum number of batches to submit in a single transaction. + MinBatches int `json:"min_batches"` + // The maximum number of batches to submit in a single transaction. + MaxBatches int `json:"max_batches"` + // The time in seconds after which a batch is considered stale and should be submitted ignoring the min batch count. + TimeoutSec int64 `json:"timeout"` } // ChainMonitor this config is used to get batch status from chain_monitor API. diff --git a/rollup/internal/controller/relayer/l2_relayer.go b/rollup/internal/controller/relayer/l2_relayer.go index f8e917dde..c86ca26ec 100644 --- a/rollup/internal/controller/relayer/l2_relayer.go +++ b/rollup/internal/controller/relayer/l2_relayer.go @@ -378,11 +378,178 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() { // ProcessPendingBatches processes the pending batches by sending commitBatch transactions to layer 1. func (r *Layer2Relayer) ProcessPendingBatches() { // get pending batches from database in ascending order by their index. - dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, 5) + dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, max(5, r.cfg.SenderConfig.BatchSubmission.MaxBatches)) if err != nil { log.Error("Failed to fetch pending L2 batches", "err", err) return } + + var batchesToSubmit []*dbBatchWithChunksAndParent + var forceSubmit bool + for i, dbBatch := range dbBatches { + if i == 0 && encoding.CodecVersion(dbBatch.CodecVersion) < encoding.CodecV6 { + // if the first batch is not >= V6 then we need to submit batches one by one + r.processPendingBatchesV4(dbBatches) + return + } + + batchesToSubmitLen := len(batchesToSubmit) + var dbChunks []*orm.Chunk + var dbParentBatch *orm.Batch + + // Verify batches compatibility + { + dbChunks, err = r.chunkOrm.GetChunksInRange(r.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex) + if err != nil { + log.Error("failed to get chunks in range", "err", err) + return + } + + // check codec version + for _, dbChunk := range dbChunks { + if dbBatch.CodecVersion != dbChunk.CodecVersion { + log.Error("batch codec version is different from chunk codec version", "batch index", dbBatch.Index, "chunk index", dbChunk.Index, "batch codec version", dbBatch.CodecVersion, "chunk codec version", dbChunk.CodecVersion) + return + } + } + + if dbBatch.Index == 0 { + log.Error("invalid args: batch index is 0, should only happen in committing genesis batch") + return + } + + dbParentBatch, err = r.batchOrm.GetBatchByIndex(r.ctx, dbBatch.Index-1) + if err != nil { + log.Error("failed to get parent batch header", "err", err) + return + } + + if dbParentBatch.CodecVersion > dbBatch.CodecVersion { + log.Error("parent batch codec version is greater than current batch codec version", "index", dbBatch.Index, "hash", dbBatch.Hash, "parent codec version", dbParentBatch.CodecVersion, "current codec version", dbBatch.CodecVersion) + return + } + + // make sure we commit batches of the same codec version together. + // If we encounter a batch with a different codec version, we stop here and will commit the batches we have so far. + // The next call of ProcessPendingBatches will then start with the batch with the different codec version. + if batchesToSubmitLen > 0 && batchesToSubmit[batchesToSubmitLen-1].Batch.CodecVersion != dbBatch.CodecVersion { + break + } + } + + // if one of the batches is too old, we force submit all batches that we have so far in the next step + if !forceSubmit && time.Since(dbBatch.CreatedAt) > time.Duration(r.cfg.SenderConfig.BatchSubmission.TimeoutSec)*time.Second { + forceSubmit = true + } + + if batchesToSubmitLen <= r.cfg.SenderConfig.BatchSubmission.MaxBatches { + batchesToSubmit = append(batchesToSubmit, &dbBatchWithChunksAndParent{ + Batch: dbBatch, + Chunks: dbChunks, + ParentBatch: dbParentBatch, + }) + } + } + + if !forceSubmit && len(batchesToSubmit) < r.cfg.SenderConfig.BatchSubmission.MinBatches { + log.Debug("Not enough batches to submit", "count", len(batchesToSubmit), "minBatches", r.cfg.SenderConfig.BatchSubmission.MinBatches, "maxBatches", r.cfg.SenderConfig.BatchSubmission.MaxBatches) + return + } + + if forceSubmit { + log.Info("Forcing submission of batches due to timeout", "batch index", batchesToSubmit[0].Batch.Index, "created at", batchesToSubmit[0].Batch.CreatedAt) + } + + // We have at least 1 batch to commit + firstBatch := batchesToSubmit[0].Batch + lastBatch := batchesToSubmit[len(batchesToSubmit)-1].Batch + + var calldata []byte + var blobs []*kzg4844.Blob + var maxBlockHeight uint64 + var totalGasUsed uint64 + + codecVersion := encoding.CodecVersion(firstBatch.CodecVersion) + switch codecVersion { + case encoding.CodecV6: + calldata, blobs, maxBlockHeight, totalGasUsed, err = r.constructCommitBatchPayloadCodecV6(batchesToSubmit) + if err != nil { + log.Error("failed to construct commitBatchWithBlobProof payload for V6", "codecVersion", codecVersion, "start index", firstBatch.Index, "end index", lastBatch.Index, "err", err) + return + } + default: + log.Error("unsupported codec version in ProcessPendingBatches", "codecVersion", codecVersion, "start index", firstBatch, "end index", lastBatch.Index) + return + } + + txHash, err := r.commitSender.SendTransaction(r.contextIDFromBatches(batchesToSubmit), &r.cfg.RollupContractAddress, calldata, blobs, 0) + if err != nil { + if errors.Is(err, sender.ErrTooManyPendingBlobTxs) { + r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc() + log.Debug( + "Skipped sending commitBatch tx to L1: too many pending blob txs", + "maxPending", r.cfg.SenderConfig.MaxPendingBlobTxs, + "err", err, + ) + return + } + log.Error( + "Failed to send commitBatch tx to layer1", + "start index", firstBatch.Index, + "start hash", firstBatch.Hash, + "end index", lastBatch.Index, + "end hash", lastBatch.Hash, + "RollupContractAddress", r.cfg.RollupContractAddress, + "err", err, + "calldata", common.Bytes2Hex(calldata), + ) + return + } + + if err = r.db.Transaction(func(dbTX *gorm.DB) error { + for _, dbBatch := range batchesToSubmit { + if err = r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, dbBatch.Batch.Hash, txHash.String(), types.RollupCommitting, dbTX); err != nil { + return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed for batch %d: %s, err %v", dbBatch.Batch.Index, dbBatch.Batch.Hash, err) + } + } + + return nil + }); err != nil { + log.Error("failed to update status for batches to RollupCommitting", "err", err) + } + + r.metrics.rollupL2RelayerCommitBlockHeight.Set(float64(maxBlockHeight)) + r.metrics.rollupL2RelayerCommitThroughput.Add(float64(totalGasUsed)) + r.metrics.rollupL2RelayerProcessPendingBatchSuccessTotal.Add(float64(len(batchesToSubmit))) + + log.Info("Sent the commitBatches tx to layer1", "batches count", len(batchesToSubmit), "start index", firstBatch.Index, "start hash", firstBatch.Hash, "end index", lastBatch.Index, "end hash", lastBatch.Hash, "tx hash", txHash.String()) +} + +func (r *Layer2Relayer) contextIDFromBatches(batches []*dbBatchWithChunksAndParent) string { + contextIDs := []string{"v6"} + + for _, batch := range batches { + contextIDs = append(contextIDs, batch.Batch.Hash) + } + + return strings.Join(contextIDs, "-") +} + +func (r *Layer2Relayer) batchHashesFromContextID(contextID string) []string { + if strings.HasPrefix(contextID, "v6-") { + return strings.Split(contextID, "-")[1:] + } + + return []string{contextID} +} + +type dbBatchWithChunksAndParent struct { + Batch *orm.Batch + Chunks []*orm.Chunk + ParentBatch *orm.Batch +} + +func (r *Layer2Relayer) processPendingBatchesV4(dbBatches []*orm.Batch) { for _, dbBatch := range dbBatches { r.metrics.rollupL2RelayerProcessPendingBatchTotal.Inc() @@ -437,7 +604,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() { return } default: - log.Error("unsupported codec version", "codecVersion", codecVersion) + log.Error("unsupported codec version in processPendingBatchesV4", "codecVersion", codecVersion) return } @@ -449,7 +616,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() { log.Warn("Batch commit previously failed, using eth_estimateGas for the re-submission", "hash", dbBatch.Hash) } - txHash, err := r.commitSender.SendTransaction(dbBatch.Hash, &r.cfg.RollupContractAddress, calldata, blob, fallbackGasLimit) + txHash, err := r.commitSender.SendTransaction(dbBatch.Hash, &r.cfg.RollupContractAddress, calldata, []*kzg4844.Blob{blob}, fallbackGasLimit) if err != nil { if errors.Is(err, sender.ErrTooManyPendingBlobTxs) { r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc() @@ -749,9 +916,17 @@ func (r *Layer2Relayer) handleConfirmation(cfm *sender.Confirmation) { log.Warn("CommitBatchTxType transaction confirmed but failed in layer1", "confirmation", cfm) } - err := r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, cfm.ContextID, cfm.TxHash.String(), status) - if err != nil { - log.Warn("UpdateCommitTxHashAndRollupStatus failed", "confirmation", cfm, "err", err) + batchHashes := r.batchHashesFromContextID(cfm.ContextID) + if err := r.db.Transaction(func(dbTX *gorm.DB) error { + for _, batchHash := range batchHashes { + if err := r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHash, cfm.TxHash.String(), status, dbTX); err != nil { + return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed, batchHash: %s, txHash: %s, status: %s, err: %w", batchHash, cfm.TxHash.String(), status, err) + } + } + + return nil + }); err != nil { + log.Warn("failed to update confirmation status for batches", "confirmation", cfm, "err", err) } case types.SenderTypeFinalizeBatch: if strings.HasPrefix(cfm.ContextID, "finalizeBundle-") { @@ -887,6 +1062,71 @@ func (r *Layer2Relayer) constructCommitBatchPayloadCodecV4(dbBatch *orm.Batch, d return calldata, daBatch.Blob(), nil } +func (r *Layer2Relayer) constructCommitBatchPayloadCodecV6(batchesToSubmit []*dbBatchWithChunksAndParent) ([]byte, []*kzg4844.Blob, uint64, uint64, error) { + var maxBlockHeight uint64 + var totalGasUsed uint64 + blobs := make([]*kzg4844.Blob, len(batchesToSubmit)) + + version := encoding.CodecVersion(batchesToSubmit[0].Batch.CodecVersion) + var firstParentBatch *orm.Batch + // construct blobs + for _, b := range batchesToSubmit { + // double check that all batches have the same version + batchVersion := encoding.CodecVersion(b.Batch.CodecVersion) + if batchVersion != version { + return nil, nil, 0, 0, fmt.Errorf("codec version mismatch, expected: %d, got: %d for batches %d and %d", version, batchVersion, batchesToSubmit[0].Batch.Index, b.Batch.Index) + } + + if firstParentBatch == nil { + firstParentBatch = b.ParentBatch + } + + chunks := make([]*encoding.Chunk, len(b.Chunks)) + for i, c := range b.Chunks { + blocks, err := r.l2BlockOrm.GetL2BlocksInRange(r.ctx, c.StartBlockNumber, c.EndBlockNumber) + if err != nil { + return nil, nil, 0, 0, fmt.Errorf("failed to get blocks in range for batch %d: %w", b.Batch.Index, err) + } + chunks[i] = &encoding.Chunk{Blocks: blocks} + + if c.EndBlockNumber > maxBlockHeight { + maxBlockHeight = c.EndBlockNumber + } + totalGasUsed += c.TotalL2TxGas + } + + encodingBatch := &encoding.Batch{ + Index: b.Batch.Index, + TotalL1MessagePoppedBefore: b.Chunks[0].TotalL1MessagesPoppedBefore, + ParentBatchHash: common.HexToHash(b.ParentBatch.Hash), + Chunks: chunks, + } + + codec, err := encoding.CodecFromVersion(version) + if err != nil { + return nil, nil, 0, 0, fmt.Errorf("failed to get codec from version %d, err: %w", b.Batch.CodecVersion, err) + } + + daBatch, err := codec.NewDABatch(encodingBatch) + if err != nil { + return nil, nil, 0, 0, fmt.Errorf("failed to create DA batch: %w", err) + } + + blobs = append(blobs, daBatch.Blob()) + } + + if firstParentBatch == nil { + return nil, nil, 0, 0, fmt.Errorf("firstParentBatch is nil") + } + + // TODO: this needs to be updated once the contract interface is finalized + calldata, err := r.l1RollupABI.Pack("commitBatches", version, firstParentBatch.BatchHeader) + if err != nil { + return nil, nil, 0, 0, fmt.Errorf("failed to pack commitBatchWithBlobProof: %w", err) + } + return calldata, blobs, maxBlockHeight, totalGasUsed, nil +} + func (r *Layer2Relayer) constructFinalizeBundlePayloadCodecV4(dbBatch *orm.Batch, aggProof *message.BundleProof) ([]byte, error) { if aggProof != nil { // finalizeBundle with proof. calldata, packErr := r.l1RollupABI.Pack( diff --git a/rollup/internal/controller/sender/sender.go b/rollup/internal/controller/sender/sender.go index 657e02238..4138d5a36 100644 --- a/rollup/internal/controller/sender/sender.go +++ b/rollup/internal/controller/sender/sender.go @@ -171,7 +171,7 @@ func (s *Sender) getFeeData(target *common.Address, data []byte, sidecar *gethTy } // SendTransaction send a signed L2tL1 transaction. -func (s *Sender) SendTransaction(contextID string, target *common.Address, data []byte, blob *kzg4844.Blob, fallbackGasLimit uint64) (common.Hash, error) { +func (s *Sender) SendTransaction(contextID string, target *common.Address, data []byte, blobs []*kzg4844.Blob, fallbackGasLimit uint64) (common.Hash, error) { s.metrics.sendTransactionTotal.WithLabelValues(s.service, s.name).Inc() var ( feeData *FeeData @@ -179,7 +179,7 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data err error ) - if blob != nil { + if blobs != nil { // check that number of pending blob-carrying txs is not too big if s.senderType == types.SenderTypeCommitBatch { var numPendingTransactions int64 @@ -197,7 +197,7 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data } } - sidecar, err = makeSidecar(blob) + sidecar, err = makeSidecar(blobs) if err != nil { log.Error("failed to make sidecar for blob transaction", "error", err) return common.Hash{}, fmt.Errorf("failed to make sidecar for blob transaction, err: %w", err) @@ -681,12 +681,16 @@ func (s *Sender) getBlockNumberAndBaseFeeAndBlobFee(ctx context.Context) (uint64 return header.Number.Uint64() - 1, baseFee, blobBaseFee, nil } -func makeSidecar(blob *kzg4844.Blob) (*gethTypes.BlobTxSidecar, error) { - if blob == nil { - return nil, errors.New("blob cannot be nil") +func makeSidecar(blobsInput []*kzg4844.Blob) (*gethTypes.BlobTxSidecar, error) { + if blobsInput == nil { + return nil, errors.New("blobs cannot be nil") + } + + blobs := make([]kzg4844.Blob, len(blobsInput)) + for i, blob := range blobsInput { + blobs[i] = *blob } - blobs := []kzg4844.Blob{*blob} var commitments []kzg4844.Commitment var proofs []kzg4844.Proof diff --git a/rollup/internal/orm/batch.go b/rollup/internal/orm/batch.go index d965015eb..bc93366cd 100644 --- a/rollup/internal/orm/batch.go +++ b/rollup/internal/orm/batch.go @@ -395,7 +395,7 @@ func (o *Batch) UpdateRollupStatus(ctx context.Context, hash string, status type } // UpdateCommitTxHashAndRollupStatus updates the commit transaction hash and rollup status for a batch. -func (o *Batch) UpdateCommitTxHashAndRollupStatus(ctx context.Context, hash string, commitTxHash string, status types.RollupStatus) error { +func (o *Batch) UpdateCommitTxHashAndRollupStatus(ctx context.Context, hash string, commitTxHash string, status types.RollupStatus, dbTX ...*gorm.DB) error { updateFields := make(map[string]interface{}) updateFields["commit_tx_hash"] = commitTxHash updateFields["rollup_status"] = int(status) @@ -403,7 +403,11 @@ func (o *Batch) UpdateCommitTxHashAndRollupStatus(ctx context.Context, hash stri updateFields["committed_at"] = utils.NowUTC() } - db := o.db.WithContext(ctx) + db := o.db + if len(dbTX) > 0 && dbTX[0] != nil { + db = dbTX[0] + } + db = db.WithContext(ctx) db = db.Model(&Batch{}) db = db.Where("hash", hash)