diff --git a/arbnode/blockmetadata.go b/arbnode/blockmetadata.go new file mode 100644 index 0000000000..96e02e07b8 --- /dev/null +++ b/arbnode/blockmetadata.go @@ -0,0 +1,151 @@ +package arbnode + +import ( + "bytes" + "context" + "encoding/binary" + "time" + + "github.com/spf13/pflag" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/execution/gethexec" + "github.com/offchainlabs/nitro/util" + "github.com/offchainlabs/nitro/util/rpcclient" + "github.com/offchainlabs/nitro/util/stopwaiter" +) + +type BlockMetadataFetcherConfig struct { + Enable bool `koanf:"enable"` + Source rpcclient.ClientConfig `koanf:"source" reload:"hot"` + SyncInterval time.Duration `koanf:"sync-interval"` + APIBlocksLimit uint64 `koanf:"api-blocks-limit"` +} + +var DefaultBlockMetadataFetcherConfig = BlockMetadataFetcherConfig{ + Enable: false, + Source: rpcclient.DefaultClientConfig, + SyncInterval: time.Minute * 5, + APIBlocksLimit: 100, +} + +func BlockMetadataFetcherConfigAddOptions(prefix string, f *pflag.FlagSet) { + f.Bool(prefix+".enable", DefaultBlockMetadataFetcherConfig.Enable, "enable syncing blockMetadata using a bulk blockMetadata api. If the source doesn't have the missing blockMetadata, we keep retyring in every sync-interval (default=5mins) duration") + rpcclient.RPCClientAddOptions(prefix+".source", f, &DefaultBlockMetadataFetcherConfig.Source) + f.Duration(prefix+".sync-interval", DefaultBlockMetadataFetcherConfig.SyncInterval, "interval at which blockMetadata are synced regularly") + f.Uint64(prefix+".api-blocks-limit", DefaultBlockMetadataFetcherConfig.APIBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query.\n"+ + "This should be set lesser than or equal to the limit on the api provider side") +} + +type BlockMetadataFetcher struct { + stopwaiter.StopWaiter + config BlockMetadataFetcherConfig + db ethdb.Database + client *rpcclient.RpcClient + exec execution.ExecutionClient +} + +func NewBlockMetadataFetcher(ctx context.Context, c BlockMetadataFetcherConfig, db ethdb.Database, exec execution.ExecutionClient) (*BlockMetadataFetcher, error) { + client := rpcclient.NewRpcClient(func() *rpcclient.ClientConfig { return &c.Source }, nil) + if err := client.Start(ctx); err != nil { + return nil, err + } + return &BlockMetadataFetcher{ + config: c, + db: db, + client: client, + exec: exec, + }, nil +} + +func (b *BlockMetadataFetcher) fetch(ctx context.Context, fromBlock, toBlock uint64) ([]gethexec.NumberAndBlockMetadata, error) { + var result []gethexec.NumberAndBlockMetadata + // #nosec G115 + err := b.client.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(fromBlock), rpc.BlockNumber(toBlock)) + if err != nil { + return nil, err + } + return result, nil +} + +func (b *BlockMetadataFetcher) persistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error { + batch := b.db.NewBatch() + queryMap := util.ArrayToSet(query) + for _, elem := range result { + pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber) + if err != nil { + return err + } + if _, ok := queryMap[uint64(pos)]; ok { + if err := batch.Put(dbKey(blockMetadataInputFeedPrefix, uint64(pos)), elem.RawMetadata); err != nil { + return err + } + if err := batch.Delete(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos))); err != nil { + return err + } + // If we reached the ideal batch size, commit and reset + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + return err + } + batch.Reset() + } + } + } + return batch.Write() +} + +func (b *BlockMetadataFetcher) Update(ctx context.Context) time.Duration { + handleQuery := func(query []uint64) bool { + result, err := b.fetch( + ctx, + b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])), + b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])), + ) + if err != nil { + log.Error("Error getting result from bulk blockMetadata API", "err", err) + return false + } + if err = b.persistBlockMetadata(query, result); err != nil { + log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err) + return false + } + return true + } + iter := b.db.NewIterator(missingBlockMetadataInputFeedPrefix, nil) + defer iter.Release() + var query []uint64 + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) + query = append(query, binary.BigEndian.Uint64(keyBytes)) + end := len(query) - 1 + if query[end]-query[0]+1 >= uint64(b.config.APIBlocksLimit) { + if query[end]-query[0]+1 > uint64(b.config.APIBlocksLimit) && len(query) >= 2 { + end -= 1 + } + if success := handleQuery(query[:end+1]); !success { + return b.config.SyncInterval + } + query = query[end+1:] + } + } + if len(query) > 0 { + _ = handleQuery(query) + } + return b.config.SyncInterval +} + +func (b *BlockMetadataFetcher) Start(ctx context.Context) { + b.StopWaiter.Start(ctx, b) + b.CallIteratively(b.Update) +} + +func (b *BlockMetadataFetcher) StopAndWait() { + b.StopWaiter.StopAndWait() + b.client.Close() +} diff --git a/arbnode/node.go b/arbnode/node.go index 84468de4a4..8bd63bec11 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -82,23 +82,24 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com } type Config struct { - Sequencer bool `koanf:"sequencer"` - ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` - InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` - DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` - BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"` - MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"` - BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"` - Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"` - Staker legacystaker.L1ValidatorConfig `koanf:"staker" reload:"hot"` - Bold boldstaker.BoldConfig `koanf:"bold"` - SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"` - DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"` - SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` - Dangerous DangerousConfig `koanf:"dangerous"` - TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` - Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` - ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` + Sequencer bool `koanf:"sequencer"` + ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` + InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` + DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` + BatchPoster BatchPosterConfig `koanf:"batch-poster" reload:"hot"` + MessagePruner MessagePrunerConfig `koanf:"message-pruner" reload:"hot"` + BlockValidator staker.BlockValidatorConfig `koanf:"block-validator" reload:"hot"` + Feed broadcastclient.FeedConfig `koanf:"feed" reload:"hot"` + Staker legacystaker.L1ValidatorConfig `koanf:"staker" reload:"hot"` + Bold boldstaker.BoldConfig `koanf:"bold"` + SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"` + DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"` + SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` + Dangerous DangerousConfig `koanf:"dangerous"` + TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` + Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` + ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` + BlockMetadataFetcher BlockMetadataFetcherConfig `koanf:"block-metadata-fetcher" reload:"hot"` // SnapSyncConfig is only used for testing purposes, these should not be configured in production. SnapSyncTest SnapSyncConfig } @@ -135,6 +136,12 @@ func (c *Config) Validate() error { if err := c.Staker.Validate(); err != nil { return err } + if c.Sequencer && c.TransactionStreamer.TrackBlockMetadataFrom == 0 { + return errors.New("when sequencer is enabled track-block-metadata-from should be set as well") + } + if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataFetcher.Enable { + log.Warn("track-block-metadata-from is set but blockMetadata fetcher is not enabled") + } return nil } @@ -165,27 +172,29 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed DangerousConfigAddOptions(prefix+".dangerous", f) TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f) MaintenanceConfigAddOptions(prefix+".maintenance", f) + BlockMetadataFetcherConfigAddOptions(prefix+".block-metadata-fetcher", f) } var ConfigDefault = Config{ - Sequencer: false, - ParentChainReader: headerreader.DefaultConfig, - InboxReader: DefaultInboxReaderConfig, - DelayedSequencer: DefaultDelayedSequencerConfig, - BatchPoster: DefaultBatchPosterConfig, - MessagePruner: DefaultMessagePrunerConfig, - BlockValidator: staker.DefaultBlockValidatorConfig, - Feed: broadcastclient.FeedConfigDefault, - Staker: legacystaker.DefaultL1ValidatorConfig, - Bold: boldstaker.DefaultBoldConfig, - SeqCoordinator: DefaultSeqCoordinatorConfig, - DataAvailability: das.DefaultDataAvailabilityConfig, - SyncMonitor: DefaultSyncMonitorConfig, - Dangerous: DefaultDangerousConfig, - TransactionStreamer: DefaultTransactionStreamerConfig, - ResourceMgmt: resourcemanager.DefaultConfig, - Maintenance: DefaultMaintenanceConfig, - SnapSyncTest: DefaultSnapSyncConfig, + Sequencer: false, + ParentChainReader: headerreader.DefaultConfig, + InboxReader: DefaultInboxReaderConfig, + DelayedSequencer: DefaultDelayedSequencerConfig, + BatchPoster: DefaultBatchPosterConfig, + MessagePruner: DefaultMessagePrunerConfig, + BlockValidator: staker.DefaultBlockValidatorConfig, + Feed: broadcastclient.FeedConfigDefault, + Staker: legacystaker.DefaultL1ValidatorConfig, + Bold: boldstaker.DefaultBoldConfig, + SeqCoordinator: DefaultSeqCoordinatorConfig, + DataAvailability: das.DefaultDataAvailabilityConfig, + SyncMonitor: DefaultSyncMonitorConfig, + Dangerous: DefaultDangerousConfig, + TransactionStreamer: DefaultTransactionStreamerConfig, + ResourceMgmt: resourcemanager.DefaultConfig, + Maintenance: DefaultMaintenanceConfig, + BlockMetadataFetcher: DefaultBlockMetadataFetcherConfig, + SnapSyncTest: DefaultSnapSyncConfig, } func ConfigDefaultL1Test() *Config { @@ -195,6 +204,7 @@ func ConfigDefaultL1Test() *Config { config.SeqCoordinator = TestSeqCoordinatorConfig config.Sequencer = true config.Dangerous.NoSequencerCoordinator = true + config.TransactionStreamer.TrackBlockMetadataFrom = 1 return config } @@ -280,6 +290,7 @@ type Node struct { MaintenanceRunner *MaintenanceRunner DASLifecycleManager *das.LifecycleManager SyncMonitor *SyncMonitor + blockMetadataFetcher *BlockMetadataFetcher configFetcher ConfigFetcher ctx context.Context } @@ -511,6 +522,14 @@ func createNodeImpl( } } + var blockMetadataFetcher *BlockMetadataFetcher + if config.BlockMetadataFetcher.Enable { + blockMetadataFetcher, err = NewBlockMetadataFetcher(ctx, config.BlockMetadataFetcher, arbDb, exec) + if err != nil { + return nil, err + } + } + if !config.ParentChainReader.Enable { return &Node{ ArbDB: arbDb, @@ -534,6 +553,7 @@ func createNodeImpl( MaintenanceRunner: maintenanceRunner, DASLifecycleManager: nil, SyncMonitor: syncMonitor, + blockMetadataFetcher: blockMetadataFetcher, configFetcher: configFetcher, ctx: ctx, }, nil @@ -767,6 +787,7 @@ func createNodeImpl( MaintenanceRunner: maintenanceRunner, DASLifecycleManager: dasLifecycleManager, SyncMonitor: syncMonitor, + blockMetadataFetcher: blockMetadataFetcher, configFetcher: configFetcher, ctx: ctx, }, nil @@ -950,6 +971,9 @@ func (n *Node) Start(ctx context.Context) error { n.BroadcastClients.Start(ctx) }() } + if n.blockMetadataFetcher != nil { + n.blockMetadataFetcher.Start(ctx) + } if n.configFetcher != nil { n.configFetcher.Start(ctx) } diff --git a/arbnode/schema.go b/arbnode/schema.go index 486afb20ae..acf54c9203 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -4,15 +4,16 @@ package arbnode var ( - messagePrefix []byte = []byte("m") // maps a message sequence number to a message - blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed - blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed - messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result - legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 - rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message - parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number - sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata - delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count + messagePrefix []byte = []byte("m") // maps a message sequence number to a message + blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed + blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed + missingBlockMetadataInputFeedPrefix []byte = []byte("x") // maps a message sequence number whose blockMetaData byte array is missing to nil + messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result + legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 + rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message + parentChainBlockNumberPrefix []byte = []byte("p") // maps a delayed sequence number to a parent chain block number + sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata + delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count messageCountKey []byte = []byte("_messageCount") // contains the current message count delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 15416ad964..bfc5d952fe 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -67,12 +67,15 @@ type TransactionStreamer struct { broadcastServer *broadcaster.Broadcaster inboxReader *InboxReader delayedBridge *DelayedBridge + + trackBlockMetadataFrom arbutil.MessageIndex } type TransactionStreamerConfig struct { MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"` MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"` ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` + TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig @@ -81,18 +84,21 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 50_000, MaxReorgResequenceDepth: 1024, ExecuteMessageLoopDelay: time.Millisecond * 100, + TrackBlockMetadataFrom: 0, } var TestTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 10_000, MaxReorgResequenceDepth: 128 * 1024, ExecuteMessageLoopDelay: time.Millisecond, + TrackBlockMetadataFrom: 0, } func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages") f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)") f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") + f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "this is the block number starting from which missing of blockmetadata is being tracked in the local disk. Setting to zero (default value) disables this") } func NewTransactionStreamer( @@ -118,6 +124,13 @@ func NewTransactionStreamer( if err != nil { return nil, err } + if config().TrackBlockMetadataFrom != 0 { + trackBlockMetadataFrom, err := exec.BlockNumberToMessageIndex(config().TrackBlockMetadataFrom) + if err != nil { + return nil, err + } + streamer.trackBlockMetadataFrom = trackBlockMetadataFrom + } return streamer, nil } @@ -386,6 +399,10 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde if err != nil { return err } + err = deleteStartingAt(s.db, batch, missingBlockMetadataInputFeedPrefix, uint64ToKey(uint64(count))) + if err != nil { + return err + } err = deleteStartingAt(s.db, batch, messagePrefix, uint64ToKey(uint64(count))) if err != nil { return err @@ -1060,6 +1077,9 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty // This also allows update of BatchGasCost in message without mistakenly erasing BlockMetadata key = dbKey(blockMetadataInputFeedPrefix, uint64(pos)) return batch.Put(key, msg.BlockMetadata) + } else if s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom { + key = dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)) + return batch.Put(key, nil) } return nil } @@ -1163,17 +1183,33 @@ func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execut return msgResult, nil } -func (s *TransactionStreamer) checkResult(msgResult *execution.MessageResult, expectedBlockHash *common.Hash) { - if expectedBlockHash == nil { +func (s *TransactionStreamer) checkResult(pos arbutil.MessageIndex, msgResult *execution.MessageResult, msgAndBlockInfo *arbostypes.MessageWithMetadataAndBlockInfo) { + if msgAndBlockInfo.BlockHash == nil { return } - if msgResult.BlockHash != *expectedBlockHash { + if msgResult.BlockHash != *msgAndBlockInfo.BlockHash { log.Error( BlockHashMismatchLogMsg, - "expected", expectedBlockHash, + "expected", msgAndBlockInfo.BlockHash, "actual", msgResult.BlockHash, ) - return + // Try deleting the existing blockMetadata for this block in arbDB and set it as missing + if msgAndBlockInfo.BlockMetadata != nil { + batch := s.db.NewBatch() + if err := batch.Delete(dbKey(blockMetadataInputFeedPrefix, uint64(pos))); err != nil { + log.Error("error deleting blockMetadata of block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) + return + } + if s.trackBlockMetadataFrom != 0 && pos >= s.trackBlockMetadataFrom { + if err := batch.Put(dbKey(missingBlockMetadataInputFeedPrefix, uint64(pos)), nil); err != nil { + log.Error("error marking deleted blockMetadata as missing in arbDB for a block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) + return + } + } + if err := batch.Write(); err != nil { + log.Error("error writing batch that deletes blockMetadata of the block whose BlockHash from feed doesn't match locally computed hash", "msgSeqNum", pos, "err", err) + } + } } } @@ -1241,7 +1277,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool { } // we just log the error but not update the value in db itself with msgResult.BlockHash? and instead forward the new block hash - s.checkResult(msgResult, msgAndBlockInfo.BlockHash) + s.checkResult(pos, msgResult, msgAndBlockInfo) batch := s.db.NewBatch() err = s.storeResult(pos, *msgResult, batch) diff --git a/cmd/nitro/config_test.go b/cmd/nitro/config_test.go index ef41d704f1..9e7cb87524 100644 --- a/cmd/nitro/config_test.go +++ b/cmd/nitro/config_test.go @@ -42,7 +42,7 @@ func TestEmptyCliConfig(t *testing.T) { } func TestSeqConfig(t *testing.T) { - args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642", " ") + args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.transaction-streamer.track-block-metadata-from=10", " ") _, _, err := ParseNode(context.Background(), args) Require(t, err) } @@ -79,7 +79,7 @@ func TestInvalidArchiveConfig(t *testing.T) { } func TestAggregatorConfig(t *testing.T) { - args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.data-availability.enable --node.data-availability.rpc-aggregator.backends [{\"url\":\"http://localhost:8547\",\"pubkey\":\"abc==\"}]", " ") + args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.data-availability.enable --node.data-availability.rpc-aggregator.backends [{\"url\":\"http://localhost:8547\",\"pubkey\":\"abc==\"}] --node.transaction-streamer.track-block-metadata-from=10", " ") _, _, err := ParseNode(context.Background(), args) Require(t, err) } @@ -142,7 +142,7 @@ func TestLiveNodeConfig(t *testing.T) { jsonConfig := "{\"chain\":{\"id\":421613}}" Require(t, WriteToConfigFile(configFile, jsonConfig)) - args := strings.Split("--file-logging.enable=false --persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642", " ") + args := strings.Split("--file-logging.enable=false --persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.transaction-streamer.track-block-metadata-from=10", " ") args = append(args, []string{"--conf.file", configFile}...) config, _, err := ParseNode(context.Background(), args) Require(t, err) @@ -223,7 +223,7 @@ func TestPeriodicReloadOfLiveNodeConfig(t *testing.T) { jsonConfig := "{\"conf\":{\"reload-interval\":\"20ms\"}}" Require(t, WriteToConfigFile(configFile, jsonConfig)) - args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642", " ") + args := strings.Split("--persistent.chain /tmp/data --init.dev-init --node.parent-chain-reader.enable=false --parent-chain.id 5 --chain.id 421613 --node.batch-poster.parent-chain-wallet.pathname /l1keystore --node.batch-poster.parent-chain-wallet.password passphrase --http.addr 0.0.0.0 --ws.addr 0.0.0.0 --node.sequencer --execution.sequencer.enable --node.feed.output.enable --node.feed.output.port 9642 --node.transaction-streamer.track-block-metadata-from=10", " ") args = append(args, []string{"--conf.file", configFile}...) config, _, err := ParseNode(context.Background(), args) Require(t, err) diff --git a/cmd/util/confighelpers/configuration.go b/cmd/util/confighelpers/configuration.go index 8c4ef2a70b..6a139e4851 100644 --- a/cmd/util/confighelpers/configuration.go +++ b/cmd/util/confighelpers/configuration.go @@ -210,6 +210,7 @@ func devFlagArgs() []string { "--http.port", "8547", "--http.addr", "127.0.0.1", "--http.api=net,web3,eth,arb,arbdebug,debug", + "--node.transaction-streamer.track-block-metadata-from=1", } return args } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index d699b75a20..92899121ab 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -479,6 +479,9 @@ func (n *ExecutionNode) SetConsensusClient(consensus execution.FullConsensusClie func (n *ExecutionNode) MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 { return n.ExecEngine.MessageIndexToBlockNumber(messageNum) } +func (n *ExecutionNode) BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) { + return n.ExecEngine.BlockNumberToMessageIndex(blockNum) +} func (n *ExecutionNode) Maintenance() error { return n.ChainDB.Compact(nil, nil) diff --git a/execution/interface.go b/execution/interface.go index 666521c162..ca067240d0 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -34,6 +34,8 @@ type ExecutionClient interface { HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) ResultAtPos(pos arbutil.MessageIndex) (*MessageResult, error) + MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 + BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) } // needed for validators / stakers diff --git a/nitro-testnode b/nitro-testnode index c177f28234..15a2bfea70 160000 --- a/nitro-testnode +++ b/nitro-testnode @@ -1 +1 @@ -Subproject commit c177f282340285bcdae2d6a784547e2bb8b97498 +Subproject commit 15a2bfea7030377771c5d2749f24afc6b48c5deb diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index a16f67f522..8bbeab61c9 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -47,13 +47,13 @@ import ( "github.com/offchainlabs/nitro/util/colors" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/stopwaiter" "github.com/offchainlabs/nitro/util/testhelpers" ) -func blockMetadataInputFeedKey(pos uint64) []byte { +func dbKey(prefix []byte, pos uint64) []byte { var key []byte - prefix := []byte("t") key = append(key, prefix...) data := make([]byte, 8) binary.BigEndian.PutUint64(data, pos) @@ -61,6 +61,132 @@ func blockMetadataInputFeedKey(pos uint64) []byte { return key } +func TestTimeboostBulkBlockMetadataFetcher(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + httpConfig := genericconf.HTTPConfigDefault + httpConfig.Addr = "127.0.0.1" + httpConfig.Apply(builder.l2StackConfig) + builder.execConfig.BlockMetadataApiCacheSize = 0 // Caching is disabled + builder.nodeConfig.TransactionStreamer.TrackBlockMetadataFrom = 1 + cleanupSeq := builder.Build(t) + defer cleanupSeq() + + // Generate blocks until current block is > 20 + arbDb := builder.L2.ConsensusNode.ArbDB + builder.L2Info.GenerateAccount("User") + user := builder.L2Info.GetDefaultTransactOpts("User", ctx) + var latestL2 uint64 + var err error + var lastTx *types.Transaction + for i := 0; ; i++ { + lastTx, _ = builder.L2.TransferBalanceTo(t, "Owner", util.RemapL1Address(user.From), big.NewInt(1e18), builder.L2Info) + latestL2, err = builder.L2.Client.BlockNumber(ctx) + Require(t, err) + // Clean BlockMetadata from arbDB so that we can modify it at will + Require(t, arbDb.Delete(dbKey([]byte("t"), latestL2))) + if latestL2 > uint64(20) { + break + } + } + var sampleBulkData []common.BlockMetadata + for i := 1; i <= int(latestL2); i++ { + // #nosec G115 + blockMetadata := []byte{0, uint8(i)} + sampleBulkData = append(sampleBulkData, blockMetadata) + // #nosec G115 + Require(t, arbDb.Put(dbKey([]byte("t"), uint64(i)), blockMetadata)) + } + + nodecfg := arbnode.ConfigDefaultL1NonSequencerTest() + trackBlockMetadataFrom := uint64(5) + nodecfg.TransactionStreamer.TrackBlockMetadataFrom = trackBlockMetadataFrom + newNode, cleanupNewNode := builder.Build2ndNode(t, &SecondNodeParams{ + nodeConfig: nodecfg, + stackConfig: testhelpers.CreateStackConfigForTest(t.TempDir()), + }) + defer cleanupNewNode() + + // Wait for second node to catchup via L1, since L1 doesn't have the blockMetadata, we ensure that messages are tracked with missingBlockMetadataInputFeedPrefix prefix + _, err = WaitForTx(ctx, newNode.Client, lastTx.Hash(), time.Second*5) + Require(t, err) + + blockMetadataInputFeedPrefix := []byte("t") + missingBlockMetadataInputFeedPrefix := []byte("x") + arbDb = newNode.ConsensusNode.ArbDB + + // Introduce fragmentation + blocksWithBlockMetadata := []uint64{8, 9, 10, 14, 16} + for _, key := range blocksWithBlockMetadata { + Require(t, arbDb.Put(dbKey([]byte("t"), key), sampleBulkData[key-1])) + Require(t, arbDb.Delete(dbKey([]byte("x"), key))) + } + + // Check if all block numbers with missingBlockMetadataInputFeedPrefix are present as keys in arbDB and that no keys with blockMetadataInputFeedPrefix + iter := arbDb.NewIterator(blockMetadataInputFeedPrefix, nil) + pos := uint64(0) + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), blockMetadataInputFeedPrefix) + if binary.BigEndian.Uint64(keyBytes) != blocksWithBlockMetadata[pos] { + t.Fatalf("unexpected presence of blockMetadata, when blocks are synced via L1. msgSeqNum: %d, expectedMsgSeqNum: %d", binary.BigEndian.Uint64(keyBytes), blocksWithBlockMetadata[pos]) + } + pos++ + } + iter.Release() + iter = arbDb.NewIterator(missingBlockMetadataInputFeedPrefix, nil) + pos = trackBlockMetadataFrom + i := 0 + for iter.Next() { + // Blocks with blockMetadata present shouldn't have the missingBlockMetadataInputFeedPrefix keys present in arbDB + for i < len(blocksWithBlockMetadata) && blocksWithBlockMetadata[i] == pos { + i++ + pos++ + } + keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) + if binary.BigEndian.Uint64(keyBytes) != pos { + t.Fatalf("unexpected msgSeqNum with missingBlockMetadataInputFeedPrefix for blockMetadata. Want: %d, Got: %d", pos, binary.BigEndian.Uint64(keyBytes)) + } + pos++ + } + if pos-1 != latestL2 { + t.Fatalf("number of keys with missingBlockMetadataInputFeedPrefix doesn't match expected value. Want: %d, Got: %d", latestL2, pos-1) + } + iter.Release() + + // Rebuild blockMetadata and cleanup trackers from ArbDB + blockMetadataFetcher, err := arbnode.NewBlockMetadataFetcher(ctx, arbnode.BlockMetadataFetcherConfig{Source: rpcclient.ClientConfig{URL: builder.L2.Stack.HTTPEndpoint()}}, arbDb, newNode.ExecNode) + Require(t, err) + blockMetadataFetcher.Update(ctx) + + // Check if all blockMetadata was synced from bulk BlockMetadata API via the blockMetadataFetcher and that trackers for missing blockMetadata were cleared + iter = arbDb.NewIterator(blockMetadataInputFeedPrefix, nil) + pos = trackBlockMetadataFrom + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), blockMetadataInputFeedPrefix) + if binary.BigEndian.Uint64(keyBytes) != pos { + t.Fatalf("unexpected msgSeqNum with blockMetadataInputFeedPrefix for blockMetadata. Want: %d, Got: %d", pos, binary.BigEndian.Uint64(keyBytes)) + } + if !bytes.Equal(sampleBulkData[pos-1], iter.Value()) { + t.Fatalf("blockMetadata mismatch for blockNumber: %d. Want: %v, Got: %v", pos, sampleBulkData[pos-1], iter.Value()) + } + pos++ + } + if pos-1 != latestL2 { + t.Fatalf("number of keys with blockMetadataInputFeedPrefix doesn't match expected value. Want: %d, Got: %d", latestL2, pos-1) + } + iter.Release() + iter = arbDb.NewIterator(missingBlockMetadataInputFeedPrefix, nil) + for iter.Next() { + keyBytes := bytes.TrimPrefix(iter.Key(), missingBlockMetadataInputFeedPrefix) + t.Fatalf("unexpected presence of msgSeqNum with missingBlockMetadataInputFeedPrefix, indicating missing of some blockMetadata after rebuilding. msgSeqNum: %d", binary.BigEndian.Uint64(keyBytes)) + } + iter.Release() +} + func TestTimeboostedFieldInReceiptsObject(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -88,7 +214,7 @@ func TestTimeboostedFieldInReceiptsObject(t *testing.T) { for i := uint64(1); i < latestL2; i++ { // Clean BlockMetadata from arbDB so that we can modify it at will - Require(t, arbDb.Delete(blockMetadataInputFeedKey(i))) + Require(t, arbDb.Delete(dbKey([]byte("t"), i))) } block, err := builder.L2.Client.BlockByNumber(ctx, blockNum) @@ -98,7 +224,7 @@ func TestTimeboostedFieldInReceiptsObject(t *testing.T) { } // Set first tx (internal tx anyway) to not timeboosted and Second one to timeboosted- BlockMetadata (in bits)-> 00000000 00000010 - Require(t, arbDb.Put(blockMetadataInputFeedKey(blockNum.Uint64()), []byte{0, 2})) + Require(t, arbDb.Put(dbKey([]byte("t"), blockNum.Uint64()), []byte{0, 2})) l2rpc := builder.L2.Stack.Attach() // Extra timeboosted field in pointer form to check for its existence type timeboostedFromReceipt struct { @@ -190,7 +316,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { latestL2, err := builder.L2.Client.BlockNumber(ctx) Require(t, err) // Clean BlockMetadata from arbDB so that we can modify it at will - Require(t, arbDb.Delete(blockMetadataInputFeedKey(latestL2))) + Require(t, arbDb.Delete(dbKey([]byte("t"), latestL2))) // #nosec G115 if latestL2 > uint64(end)+10 { break @@ -205,7 +331,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { RawMetadata: []byte{0, uint8(i)}, } sampleBulkData = append(sampleBulkData, sampleData) - Require(t, arbDb.Put(blockMetadataInputFeedKey(sampleData.BlockNumber), sampleData.RawMetadata)) + Require(t, arbDb.Put(dbKey([]byte("t"), sampleData.BlockNumber), sampleData.RawMetadata)) } l2rpc := builder.L2.Stack.Attach() @@ -227,7 +353,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { // Test that without cache the result returned is always in sync with ArbDB sampleBulkData[0].RawMetadata = []byte{1, 11} - Require(t, arbDb.Put(blockMetadataInputFeedKey(1), sampleBulkData[0].RawMetadata)) + Require(t, arbDb.Put(dbKey([]byte("t"), 1), sampleBulkData[0].RawMetadata)) err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(1), rpc.BlockNumber(1)) Require(t, err) @@ -248,7 +374,7 @@ func TestTimeboostBulkBlockMetadataAPI(t *testing.T) { arbDb = builder.L2.ConsensusNode.ArbDB updatedBlockMetadata := []byte{2, 12} - Require(t, arbDb.Put(blockMetadataInputFeedKey(1), updatedBlockMetadata)) + Require(t, arbDb.Put(dbKey([]byte("t"), 1), updatedBlockMetadata)) err = l2rpc.CallContext(ctx, &result, "arb_getRawBlockMetadata", rpc.BlockNumber(1), rpc.BlockNumber(1)) Require(t, err) @@ -720,6 +846,7 @@ func setupExpressLaneAuction( Enable: false, // We need to start without timeboost initially to create the auction contract ExpressLaneAdvantage: time.Second * 5, } + builderSeq.nodeConfig.TransactionStreamer.TrackBlockMetadataFrom = 1 cleanupSeq := builderSeq.Build(t) seqInfo, seqNode, seqClient := builderSeq.L2Info, builderSeq.L2.ConsensusNode, builderSeq.L2.Client diff --git a/util/common.go b/util/common.go new file mode 100644 index 0000000000..fc71e4a704 --- /dev/null +++ b/util/common.go @@ -0,0 +1,9 @@ +package util + +func ArrayToSet[T comparable](arr []T) map[T]struct{} { + ret := make(map[T]struct{}) + for _, elem := range arr { + ret[elem] = struct{}{} + } + return ret +}