diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 70c5952042..3adedb4ac1 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -97,7 +97,7 @@ type BatchPoster struct { l1Reader *headerreader.HeaderReader inbox *InboxTracker streamer *TransactionStreamer - arbOSVersionGetter execution.FullExecutionClient + arbOSVersionGetter execution.ExecutionBatchPoster config BatchPosterConfigFetcher seqInbox *bridgegen.SequencerInbox syncMonitor *SyncMonitor @@ -307,7 +307,7 @@ type BatchPosterOpts struct { L1Reader *headerreader.HeaderReader Inbox *InboxTracker Streamer *TransactionStreamer - VersionGetter execution.FullExecutionClient + VersionGetter execution.ExecutionBatchPoster SyncMonitor *SyncMonitor Config BatchPosterConfigFetcher DeployInfo *chaininfo.RollupAddresses diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 0c31008ff1..2207e47b4e 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -23,27 +23,81 @@ import ( "github.com/offchainlabs/nitro/arbos/l2pricing" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/cmd/chaininfo" + "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/statetransfer" "github.com/offchainlabs/nitro/util/arbmath" + "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/testhelpers" "github.com/offchainlabs/nitro/util/testhelpers/env" ) type execClientWrapper struct { - *gethexec.ExecutionEngine - t *testing.T + ExecutionEngine *gethexec.ExecutionEngine + t *testing.T } -func (w *execClientWrapper) Pause() { w.t.Error("not supported") } -func (w *execClientWrapper) Activate() { w.t.Error("not supported") } +func (w *execClientWrapper) Pause() { w.t.Error("not supported") } + +func (w *execClientWrapper) Activate() { w.t.Error("not supported") } + func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil } -func (w *execClientWrapper) Synced() bool { w.t.Error("not supported"); return false } + +func (w *execClientWrapper) SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error { + return w.ExecutionEngine.SequenceDelayedMessage(message, delayedSeqNum) +} + +func (w *execClientWrapper) NextDelayedMessageNumber() (uint64, error) { + return w.ExecutionEngine.NextDelayedMessageNumber() +} + +func (w *execClientWrapper) MarkFeedStart(to arbutil.MessageIndex) containers.PromiseInterface[struct{}] { + markFeedStartWithReturn := func(to arbutil.MessageIndex) (struct{}, error) { + w.ExecutionEngine.MarkFeedStart(to) + return struct{}{}, nil + } + return containers.NewReadyPromise(markFeedStartWithReturn(to)) +} + +func (w *execClientWrapper) Maintenance() containers.PromiseInterface[struct{}] { + return containers.NewReadyPromise(struct{}{}, nil) +} + +func (w *execClientWrapper) Synced() bool { w.t.Error("not supported"); return false } + func (w *execClientWrapper) FullSyncProgressMap() map[string]interface{} { w.t.Error("not supported") return nil } +func (w *execClientWrapper) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.MessageResult] { + return containers.NewReadyPromise(w.ExecutionEngine.DigestMessage(num, msg, msgForPrefetch)) +} + +func (w *execClientWrapper) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*execution.MessageResult] { + return containers.NewReadyPromise(w.ExecutionEngine.Reorg(count, newMessages, oldMessages)) +} + +func (w *execClientWrapper) HeadMessageNumber() containers.PromiseInterface[arbutil.MessageIndex] { + return containers.NewReadyPromise(w.ExecutionEngine.HeadMessageNumber()) +} + +func (w *execClientWrapper) HeadMessageNumberSync(t *testing.T) containers.PromiseInterface[arbutil.MessageIndex] { + return containers.NewReadyPromise(w.ExecutionEngine.HeadMessageNumberSync(t)) +} + +func (w *execClientWrapper) ResultAtPos(pos arbutil.MessageIndex) containers.PromiseInterface[*execution.MessageResult] { + return containers.NewReadyPromise(w.ExecutionEngine.ResultAtPos(pos)) +} + +func (w *execClientWrapper) Start(ctx context.Context) containers.PromiseInterface[struct{}] { + return containers.NewReadyPromise(struct{}{}, nil) +} + +func (w *execClientWrapper) StopAndWait() containers.PromiseInterface[struct{}] { + return containers.NewReadyPromise(struct{}{}, nil) +} + func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) { chainConfig := chaininfo.ArbitrumDevTestChainConfig() diff --git a/arbnode/maintenance.go b/arbnode/maintenance.go index 5e4e56b577..5968cec75d 100644 --- a/arbnode/maintenance.go +++ b/arbnode/maintenance.go @@ -24,7 +24,7 @@ import ( type MaintenanceRunner struct { stopwaiter.StopWaiter - exec execution.FullExecutionClient + exec execution.ExecutionClient config MaintenanceConfigFetcher seqCoordinator *SeqCoordinator dbs []ethdb.Database @@ -87,7 +87,7 @@ var DefaultMaintenanceConfig = MaintenanceConfig{ type MaintenanceConfigFetcher func() *MaintenanceConfig -func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCoordinator, dbs []ethdb.Database, exec execution.FullExecutionClient) (*MaintenanceRunner, error) { +func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCoordinator, dbs []ethdb.Database, exec execution.ExecutionClient) (*MaintenanceRunner, error) { cfg := config() if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("validating config: %w", err) @@ -183,7 +183,8 @@ func (mr *MaintenanceRunner) runMaintenance() { } expected++ go func() { - results <- mr.exec.Maintenance() + _, res := mr.exec.Maintenance().Await(mr.GetContext()) + results <- res }() for i := 0; i < expected; i++ { err := <-results diff --git a/arbnode/node.go b/arbnode/node.go index b9ac975176..8f2502ff71 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -263,7 +263,9 @@ func DangerousConfigAddOptions(prefix string, f *flag.FlagSet) { type Node struct { ArbDB ethdb.Database Stack *node.Node - Execution execution.FullExecutionClient + ExecutionClient execution.ExecutionClient + ExecutionSequencer execution.ExecutionSequencer + ExecutionRecorder execution.ExecutionRecorder L1Reader *headerreader.HeaderReader TxStreamer *TransactionStreamer DeployInfo *chaininfo.RollupAddresses @@ -412,45 +414,38 @@ func StakerDataposter( }) } -func createNodeImpl( - ctx context.Context, - stack *node.Node, - exec execution.FullExecutionClient, - arbDb ethdb.Database, - configFetcher ConfigFetcher, - l2Config *params.ChainConfig, - l1client *ethclient.Client, - deployInfo *chaininfo.RollupAddresses, - txOptsValidator *bind.TransactOpts, - txOptsBatchPoster *bind.TransactOpts, - dataSigner signature.DataSignerFunc, - fatalErrChan chan error, - parentChainID *big.Int, - blobReader daprovider.BlobReader, -) (*Node, error) { - config := configFetcher.Get() - - err := checkArbDbSchemaVersion(arbDb) - if err != nil { - return nil, err - } - - l2ChainId := l2Config.ChainID.Uint64() - +func getSyncMonitor(configFetcher ConfigFetcher) *SyncMonitor { syncConfigFetcher := func() *SyncMonitorConfig { return &configFetcher.Get().SyncMonitor } - syncMonitor := NewSyncMonitor(syncConfigFetcher) + return NewSyncMonitor(syncConfigFetcher) +} +func getL1Reader( + ctx context.Context, + config *Config, + configFetcher ConfigFetcher, + l1client *ethclient.Client, +) (*headerreader.HeaderReader, error) { var l1Reader *headerreader.HeaderReader if config.ParentChainReader.Enable { arbSys, _ := precompilesgen.NewArbSys(types.ArbSysAddress, l1client) + var err error l1Reader, err = headerreader.New(ctx, l1client, func() *headerreader.Config { return &configFetcher.Get().ParentChainReader }, arbSys) if err != nil { return nil, err } } + return l1Reader, nil +} +func getBroadcastServer( + config *Config, + configFetcher ConfigFetcher, + dataSigner signature.DataSignerFunc, + l2ChainId uint64, + fatalErrChan chan error, +) (*broadcaster.Broadcaster, error) { var broadcastServer *broadcaster.Broadcaster if config.Feed.Output.Enable { var maybeDataSigner signature.DataSignerFunc @@ -462,13 +457,13 @@ func createNodeImpl( } broadcastServer = broadcaster.NewBroadcaster(func() *wsbroadcastserver.BroadcasterConfig { return &configFetcher.Get().Feed.Output }, l2ChainId, fatalErrChan, maybeDataSigner) } + return broadcastServer, nil +} - transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer } - txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest) - if err != nil { - return nil, err - } - var coordinator *SeqCoordinator +func getBPVerifier( + deployInfo *chaininfo.RollupAddresses, + l1client *ethclient.Client, +) (*contracts.AddressVerifier, error) { var bpVerifier *contracts.AddressVerifier if deployInfo != nil && l1client != nil { sequencerInboxAddr := deployInfo.SequencerInbox @@ -479,21 +474,31 @@ func createNodeImpl( } bpVerifier = contracts.NewAddressVerifier(seqInboxCaller) } + return bpVerifier, nil +} - if config.SeqCoordinator.Enable { - coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator) - if err != nil { - return nil, err - } - } else if config.Sequencer && !config.Dangerous.NoSequencerCoordinator { - return nil, errors.New("sequencer must be enabled with coordinator, unless dangerous.no-sequencer-coordinator set") - } +func getMaintenanceRunner( + arbDb ethdb.Database, + configFetcher ConfigFetcher, + coordinator *SeqCoordinator, + exec execution.ExecutionClient, +) (*MaintenanceRunner, error) { dbs := []ethdb.Database{arbDb} maintenanceRunner, err := NewMaintenanceRunner(func() *MaintenanceConfig { return &configFetcher.Get().Maintenance }, coordinator, dbs, exec) if err != nil { return nil, err } + return maintenanceRunner, nil +} +func getBroadcastClients( + config *Config, + configFetcher ConfigFetcher, + txStreamer *TransactionStreamer, + l2ChainId uint64, + bpVerifier *contracts.AddressVerifier, + fatalErrChan chan error, +) (*broadcastclients.BroadcastClients, error) { var broadcastClients *broadcastclients.BroadcastClients if config.Feed.Input.Enable() { currentMessageCount, err := txStreamer.GetMessageCount() @@ -514,62 +519,54 @@ func createNodeImpl( return nil, err } } + return broadcastClients, nil +} - if !config.ParentChainReader.Enable { - return &Node{ - ArbDB: arbDb, - Stack: stack, - Execution: exec, - L1Reader: nil, - TxStreamer: txStreamer, - DeployInfo: nil, - BlobReader: blobReader, - InboxReader: nil, - InboxTracker: nil, - DelayedSequencer: nil, - BatchPoster: nil, - MessagePruner: nil, - BlockValidator: nil, - StatelessBlockValidator: nil, - Staker: nil, - BroadcastServer: broadcastServer, - BroadcastClients: broadcastClients, - SeqCoordinator: coordinator, - MaintenanceRunner: maintenanceRunner, - DASLifecycleManager: nil, - SyncMonitor: syncMonitor, - configFetcher: configFetcher, - ctx: ctx, - }, nil - } - +func getDelayedBridgeAndSequencerInbox( + deployInfo *chaininfo.RollupAddresses, + l1client *ethclient.Client, +) (*DelayedBridge, *SequencerInbox, error) { if deployInfo == nil { - return nil, errors.New("deployinfo is nil") + return nil, nil, errors.New("deployinfo is nil") } delayedBridge, err := NewDelayedBridge(l1client, deployInfo.Bridge, deployInfo.DeployedAt) if err != nil { - return nil, err + return nil, nil, err } // #nosec G115 sequencerInbox, err := NewSequencerInbox(l1client, deployInfo.SequencerInbox, int64(deployInfo.DeployedAt)) if err != nil { - return nil, err + return nil, nil, err } + return delayedBridge, sequencerInbox, nil +} +func getDAS( + ctx context.Context, + config *Config, + l2Config *params.ChainConfig, + txStreamer *TransactionStreamer, + blobReader daprovider.BlobReader, + l1Reader *headerreader.HeaderReader, + deployInfo *chaininfo.RollupAddresses, + dataSigner signature.DataSignerFunc, + l1client *ethclient.Client, +) (das.DataAvailabilityServiceWriter, *das.LifecycleManager, []daprovider.Reader, error) { var daWriter das.DataAvailabilityServiceWriter var daReader das.DataAvailabilityServiceReader var dasLifecycleManager *das.LifecycleManager var dasKeysetFetcher *das.KeysetFetcher if config.DataAvailability.Enable { + var err error if config.BatchPoster.Enable { daWriter, daReader, dasKeysetFetcher, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox) if err != nil { - return nil, err + return nil, nil, nil, err } } else { daReader, dasKeysetFetcher, dasLifecycleManager, err = das.CreateDAReaderForNode(ctx, &config.DataAvailability, l1Reader, &deployInfo.SequencerInbox) if err != nil { - return nil, err + return nil, nil, nil, err } } @@ -582,12 +579,12 @@ func createNodeImpl( daReader = das.NewReaderPanicWrapper(daReader) } } else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee { - return nil, errors.New("a data availability service is required for this chain, but it was not configured") + return nil, nil, nil, errors.New("a data availability service is required for this chain, but it was not configured") } // We support a nil txStreamer for the pruning code if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && daReader == nil { - return nil, errors.New("data availability service required but unconfigured") + return nil, nil, nil, errors.New("data availability service required but unconfigured") } var dapReaders []daprovider.Reader if daReader != nil { @@ -596,16 +593,38 @@ func createNodeImpl( if blobReader != nil { dapReaders = append(dapReaders, daprovider.NewReaderForBlobReader(blobReader)) } + + return daWriter, dasLifecycleManager, dapReaders, nil +} + +func getInboxTrackerAndReader( + ctx context.Context, + exec execution.ExecutionSequencer, + arbDb ethdb.Database, + txStreamer *TransactionStreamer, + dapReaders []daprovider.Reader, + config *Config, + configFetcher ConfigFetcher, + l1client *ethclient.Client, + l1Reader *headerreader.HeaderReader, + deployInfo *chaininfo.RollupAddresses, + delayedBridge *DelayedBridge, + sequencerInbox *SequencerInbox, +) (*InboxTracker, *InboxReader, error) { inboxTracker, err := NewInboxTracker(arbDb, txStreamer, dapReaders, config.SnapSyncTest) if err != nil { - return nil, err + return nil, nil, err } firstMessageBlock := new(big.Int).SetUint64(deployInfo.DeployedAt) if config.SnapSyncTest.Enabled { + if exec == nil { + return nil, nil, errors.New("snap sync test requires an execution sequencer") + } + batchCount := config.SnapSyncTest.BatchCount delayedMessageNumber, err := exec.NextDelayedMessageNumber() if err != nil { - return nil, err + return nil, nil, err } if batchCount > delayedMessageNumber { batchCount = delayedMessageNumber @@ -618,39 +637,28 @@ func createNodeImpl( } block, err := FindBlockContainingBatchCount(ctx, deployInfo.Bridge, l1client, config.SnapSyncTest.ParentChainAssertionBlock, batchCount) if err != nil { - return nil, err + return nil, nil, err } firstMessageBlock.SetUint64(block) } inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, firstMessageBlock, delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }) if err != nil { - return nil, err + return nil, nil, err } txStreamer.SetInboxReaders(inboxReader, delayedBridge) - var statelessBlockValidator *staker.StatelessBlockValidator - if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" { - statelessBlockValidator, err = staker.NewStatelessBlockValidator( - inboxReader, - inboxTracker, - txStreamer, - exec, - rawdb.NewTable(arbDb, storage.BlockValidatorPrefix), - dapReaders, - func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator }, - stack, - ) - } else { - err = errors.New("no validator url specified") - } - if err != nil { - if config.ValidatorRequired() || config.Staker.Enable { - return nil, fmt.Errorf("%w: failed to init block validator", err) - } - log.Warn("validation not supported", "err", err) - statelessBlockValidator = nil - } + return inboxTracker, inboxReader, nil +} +func getBlockValidator( + config *Config, + configFetcher ConfigFetcher, + statelessBlockValidator *staker.StatelessBlockValidator, + inboxTracker *InboxTracker, + txStreamer *TransactionStreamer, + fatalErrChan chan error, +) (*staker.BlockValidator, error) { + var err error var blockValidator *staker.BlockValidator if config.ValidatorRequired() { blockValidator, err = staker.NewBlockValidator( @@ -664,7 +672,27 @@ func createNodeImpl( return nil, err } } + return blockValidator, err +} +func getStaker( + ctx context.Context, + config *Config, + configFetcher ConfigFetcher, + arbDb ethdb.Database, + l1Reader *headerreader.HeaderReader, + txOptsValidator *bind.TransactOpts, + syncMonitor *SyncMonitor, + parentChainID *big.Int, + l1client *ethclient.Client, + deployInfo *chaininfo.RollupAddresses, + txStreamer *TransactionStreamer, + inboxTracker *InboxTracker, + stack *node.Node, + fatalErrChan chan error, + statelessBlockValidator *staker.StatelessBlockValidator, + blockValidator *staker.BlockValidator, +) (*multiprotocolstaker.MultiProtocolStaker, *MessagePruner, common.Address, error) { var stakerObj *multiprotocolstaker.MultiProtocolStaker var messagePruner *MessagePruner var stakerAddr common.Address @@ -680,7 +708,7 @@ func createNodeImpl( parentChainID, ) if err != nil { - return nil, err + return nil, nil, common.Address{}, err } getExtraGas := func() uint64 { return configFetcher.Get().Staker.ExtraGas } // TODO: factor this out into separate helper, and split rest of node @@ -692,7 +720,7 @@ func createNodeImpl( if len(config.Staker.ContractWalletAddress) > 0 { if !common.IsHexAddress(config.Staker.ContractWalletAddress) { log.Error("invalid validator smart contract wallet", "addr", config.Staker.ContractWalletAddress) - return nil, errors.New("invalid validator smart contract wallet address") + return nil, nil, common.Address{}, errors.New("invalid validator smart contract wallet address") } tmpAddress := common.HexToAddress(config.Staker.ContractWalletAddress) existingWalletAddress = &tmpAddress @@ -700,15 +728,15 @@ func createNodeImpl( // #nosec G115 wallet, err = validatorwallet.NewContract(dp, existingWalletAddress, deployInfo.ValidatorWalletCreator, deployInfo.Rollup, l1Reader, txOptsValidator, int64(deployInfo.DeployedAt), func(common.Address) {}, getExtraGas) if err != nil { - return nil, err + return nil, nil, common.Address{}, err } } else { if len(config.Staker.ContractWalletAddress) > 0 { - return nil, errors.New("validator contract wallet specified but flag to use a smart contract wallet was not specified") + return nil, nil, common.Address{}, errors.New("validator contract wallet specified but flag to use a smart contract wallet was not specified") } wallet, err = validatorwallet.NewEOA(dp, deployInfo.Rollup, l1client, getExtraGas) if err != nil { - return nil, err + return nil, nil, common.Address{}, err } } } @@ -721,19 +749,125 @@ func createNodeImpl( stakerObj, err = multiprotocolstaker.NewMultiProtocolStaker(stack, l1Reader, wallet, bind.CallOpts{}, func() *legacystaker.L1ValidatorConfig { return &configFetcher.Get().Staker }, &configFetcher.Get().Bold, blockValidator, statelessBlockValidator, nil, deployInfo.StakeToken, confirmedNotifiers, deployInfo.ValidatorUtils, deployInfo.Bridge, fatalErrChan) if err != nil { - return nil, err + return nil, nil, common.Address{}, err } if err := wallet.Initialize(ctx); err != nil { - return nil, err + return nil, nil, common.Address{}, err } if dp != nil { stakerAddr = dp.Sender() } } + return stakerObj, messagePruner, stakerAddr, nil +} + +func getTransactionStreamer( + arbDb ethdb.Database, + l2Config *params.ChainConfig, + exec execution.ExecutionClient, + broadcastServer *broadcaster.Broadcaster, + configFetcher ConfigFetcher, + fatalErrChan chan error, +) (*TransactionStreamer, error) { + transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &configFetcher.Get().TransactionStreamer } + txStreamer, err := NewTransactionStreamer(arbDb, l2Config, exec, broadcastServer, fatalErrChan, transactionStreamerConfigFetcher, &configFetcher.Get().SnapSyncTest) + if err != nil { + return nil, err + } + return txStreamer, nil +} + +func getSeqCoordinator( + config *Config, + dataSigner signature.DataSignerFunc, + bpVerifier *contracts.AddressVerifier, + txStreamer *TransactionStreamer, + syncMonitor *SyncMonitor, + exec execution.ExecutionSequencer, +) (*SeqCoordinator, error) { + var coordinator *SeqCoordinator + if config.SeqCoordinator.Enable { + if exec == nil { + return nil, errors.New("sequencer coordinator requires an execution sequencer") + } + + var err error + coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator) + if err != nil { + return nil, err + } + } else if config.Sequencer && !config.Dangerous.NoSequencerCoordinator { + return nil, errors.New("sequencer must be enabled with coordinator, unless dangerous.no-sequencer-coordinator set") + } + return coordinator, nil +} + +func getStatelessBlockValidator( + config *Config, + configFetcher ConfigFetcher, + inboxReader *InboxReader, + inboxTracker *InboxTracker, + txStreamer *TransactionStreamer, + exec execution.ExecutionRecorder, + arbDb ethdb.Database, + dapReaders []daprovider.Reader, + stack *node.Node, +) (*staker.StatelessBlockValidator, error) { + var err error + var statelessBlockValidator *staker.StatelessBlockValidator + if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" { + if exec == nil { + return nil, errors.New("stateless block validator requires an execution recorder") + } + + statelessBlockValidator, err = staker.NewStatelessBlockValidator( + inboxReader, + inboxTracker, + txStreamer, + exec, + rawdb.NewTable(arbDb, storage.BlockValidatorPrefix), + dapReaders, + func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator }, + stack, + ) + } else { + err = errors.New("no validator url specified") + } + if err != nil { + if config.ValidatorRequired() || config.Staker.Enable { + return nil, fmt.Errorf("%w: failed to init block validator", err) + } + log.Warn("validation not supported", "err", err) + statelessBlockValidator = nil + } + + return statelessBlockValidator, nil +} + +func getBatchPoster( + ctx context.Context, + config *Config, + configFetcher ConfigFetcher, + txOptsBatchPoster *bind.TransactOpts, + daWriter das.DataAvailabilityServiceWriter, + l1Reader *headerreader.HeaderReader, + inboxTracker *InboxTracker, + txStreamer *TransactionStreamer, + exec execution.ExecutionBatchPoster, + arbDb ethdb.Database, + syncMonitor *SyncMonitor, + deployInfo *chaininfo.RollupAddresses, + parentChainID *big.Int, + dapReaders []daprovider.Reader, + stakerAddr common.Address, +) (*BatchPoster, error) { var batchPoster *BatchPoster - var delayedSequencer *DelayedSequencer if config.BatchPoster.Enable { + if exec == nil { + return nil, errors.New("batch poster requires an execution batch poster") + } + if txOptsBatchPoster == nil && config.BatchPoster.DataPoster.ExternalSigner.URL == "" { return nil, errors.New("batchposter, but no TxOpts") } @@ -741,6 +875,7 @@ func createNodeImpl( if daWriter != nil { dapWriter = daprovider.NewWriterForDAS(daWriter) } + var err error batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{ DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix), L1Reader: l1Reader, @@ -765,8 +900,176 @@ func createNodeImpl( } } - // always create DelayedSequencer, it won't do anything if it is disabled - delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }) + return batchPoster, nil +} + +func getDelayedSequencer( + l1Reader *headerreader.HeaderReader, + inboxReader *InboxReader, + exec execution.ExecutionSequencer, + configFetcher ConfigFetcher, + coordinator *SeqCoordinator, +) (*DelayedSequencer, error) { + if exec == nil { + return nil, nil + } + + // always create DelayedSequencer if exec is non nil, it won't do anything if it is disabled + delayedSequencer, err := NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }) + if err != nil { + return nil, err + } + return delayedSequencer, nil +} + +func getNodeParentChainReaderDisabled( + ctx context.Context, + arbDb ethdb.Database, + stack *node.Node, + executionClient execution.ExecutionClient, + executionSequencer execution.ExecutionSequencer, + executionRecorder execution.ExecutionRecorder, + txStreamer *TransactionStreamer, + blobReader daprovider.BlobReader, + broadcastServer *broadcaster.Broadcaster, + broadcastClients *broadcastclients.BroadcastClients, + coordinator *SeqCoordinator, + maintenanceRunner *MaintenanceRunner, + syncMonitor *SyncMonitor, + configFetcher ConfigFetcher, +) *Node { + return &Node{ + ArbDB: arbDb, + Stack: stack, + ExecutionClient: executionClient, + ExecutionSequencer: executionSequencer, + ExecutionRecorder: executionRecorder, + L1Reader: nil, + TxStreamer: txStreamer, + DeployInfo: nil, + BlobReader: blobReader, + InboxReader: nil, + InboxTracker: nil, + DelayedSequencer: nil, + BatchPoster: nil, + MessagePruner: nil, + BlockValidator: nil, + StatelessBlockValidator: nil, + Staker: nil, + BroadcastServer: broadcastServer, + BroadcastClients: broadcastClients, + SeqCoordinator: coordinator, + MaintenanceRunner: maintenanceRunner, + DASLifecycleManager: nil, + SyncMonitor: syncMonitor, + configFetcher: configFetcher, + ctx: ctx, + } +} + +func createNodeImpl( + ctx context.Context, + stack *node.Node, + executionClient execution.ExecutionClient, + executionSequencer execution.ExecutionSequencer, + executionRecorder execution.ExecutionRecorder, + executionBatchPoster execution.ExecutionBatchPoster, + arbDb ethdb.Database, + configFetcher ConfigFetcher, + l2Config *params.ChainConfig, + l1client *ethclient.Client, + deployInfo *chaininfo.RollupAddresses, + txOptsValidator *bind.TransactOpts, + txOptsBatchPoster *bind.TransactOpts, + dataSigner signature.DataSignerFunc, + fatalErrChan chan error, + parentChainID *big.Int, + blobReader daprovider.BlobReader, +) (*Node, error) { + config := configFetcher.Get() + + err := checkArbDbSchemaVersion(arbDb) + if err != nil { + return nil, err + } + + syncMonitor := getSyncMonitor(configFetcher) + + l1Reader, err := getL1Reader(ctx, config, configFetcher, l1client) + if err != nil { + return nil, err + } + + broadcastServer, err := getBroadcastServer(config, configFetcher, dataSigner, l2Config.ChainID.Uint64(), fatalErrChan) + if err != nil { + return nil, err + } + + txStreamer, err := getTransactionStreamer(arbDb, l2Config, executionClient, broadcastServer, configFetcher, fatalErrChan) + if err != nil { + return nil, err + } + + bpVerifier, err := getBPVerifier(deployInfo, l1client) + if err != nil { + return nil, err + } + + coordinator, err := getSeqCoordinator(config, dataSigner, bpVerifier, txStreamer, syncMonitor, executionSequencer) + if err != nil { + return nil, err + } + + maintenanceRunner, err := getMaintenanceRunner(arbDb, configFetcher, coordinator, executionClient) + if err != nil { + return nil, err + } + + broadcastClients, err := getBroadcastClients(config, configFetcher, txStreamer, l2Config.ChainID.Uint64(), bpVerifier, fatalErrChan) + if err != nil { + return nil, err + } + + if !config.ParentChainReader.Enable { + return getNodeParentChainReaderDisabled(ctx, arbDb, stack, executionClient, executionSequencer, executionRecorder, txStreamer, blobReader, broadcastServer, broadcastClients, coordinator, maintenanceRunner, syncMonitor, configFetcher), nil + } + + delayedBridge, sequencerInbox, err := getDelayedBridgeAndSequencerInbox(deployInfo, l1client) + if err != nil { + return nil, err + } + + daWriter, dasLifecycleManager, dapReaders, err := getDAS(ctx, config, l2Config, txStreamer, blobReader, l1Reader, deployInfo, dataSigner, l1client) + if err != nil { + return nil, err + } + + inboxTracker, inboxReader, err := getInboxTrackerAndReader(ctx, executionSequencer, arbDb, txStreamer, dapReaders, config, configFetcher, l1client, l1Reader, deployInfo, delayedBridge, sequencerInbox) + if err != nil { + return nil, err + } + + statelessBlockValidator, err := getStatelessBlockValidator(config, configFetcher, inboxReader, inboxTracker, txStreamer, executionRecorder, arbDb, dapReaders, stack) + if err != nil { + return nil, err + } + + blockValidator, err := getBlockValidator(config, configFetcher, statelessBlockValidator, inboxTracker, txStreamer, fatalErrChan) + if err != nil { + return nil, err + } + + stakerObj, messagePruner, stakerAddr, err := getStaker(ctx, config, configFetcher, arbDb, l1Reader, txOptsValidator, syncMonitor, parentChainID, l1client, deployInfo, txStreamer, inboxTracker, stack, fatalErrChan, statelessBlockValidator, blockValidator) + if err != nil { + return nil, err + } + + batchPoster, err := getBatchPoster(ctx, config, configFetcher, txOptsBatchPoster, daWriter, l1Reader, inboxTracker, txStreamer, executionBatchPoster, arbDb, syncMonitor, deployInfo, parentChainID, dapReaders, stakerAddr) + if err != nil { + return nil, err + } + + delayedSequencer, err := getDelayedSequencer(l1Reader, inboxReader, executionSequencer, configFetcher, coordinator) if err != nil { return nil, err } @@ -774,7 +1077,9 @@ func createNodeImpl( return &Node{ ArbDB: arbDb, Stack: stack, - Execution: exec, + ExecutionClient: executionClient, + ExecutionSequencer: executionSequencer, + ExecutionRecorder: executionRecorder, L1Reader: l1Reader, TxStreamer: txStreamer, DeployInfo: deployInfo, @@ -850,26 +1155,7 @@ func (n *Node) OnConfigReload(_ *Config, _ *Config) error { return nil } -func CreateNode( - ctx context.Context, - stack *node.Node, - exec execution.FullExecutionClient, - arbDb ethdb.Database, - configFetcher ConfigFetcher, - l2Config *params.ChainConfig, - l1client *ethclient.Client, - deployInfo *chaininfo.RollupAddresses, - txOptsValidator *bind.TransactOpts, - txOptsBatchPoster *bind.TransactOpts, - dataSigner signature.DataSignerFunc, - fatalErrChan chan error, - parentChainID *big.Int, - blobReader daprovider.BlobReader, -) (*Node, error) { - currentNode, err := createNodeImpl(ctx, stack, exec, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader) - if err != nil { - return nil, err - } +func registerAPIs(currentNode *Node, stack *node.Node) { var apis []rpc.API if currentNode.BlockValidator != nil { apis = append(apis, rpc.API{ @@ -891,12 +1177,67 @@ func CreateNode( } stack.RegisterAPIs(apis) +} +func CreateNodeExecutionClient( + ctx context.Context, + stack *node.Node, + executionClient execution.ExecutionClient, + arbDb ethdb.Database, + configFetcher ConfigFetcher, + l2Config *params.ChainConfig, + l1client *ethclient.Client, + deployInfo *chaininfo.RollupAddresses, + txOptsValidator *bind.TransactOpts, + txOptsBatchPoster *bind.TransactOpts, + dataSigner signature.DataSignerFunc, + fatalErrChan chan error, + parentChainID *big.Int, + blobReader daprovider.BlobReader, +) (*Node, error) { + if executionClient == nil { + return nil, errors.New("execution client must be non-nil") + } + currentNode, err := createNodeImpl(ctx, stack, executionClient, nil, nil, nil, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader) + if err != nil { + return nil, err + } + registerAPIs(currentNode, stack) + return currentNode, nil +} + +func CreateNodeFullExecutionClient( + ctx context.Context, + stack *node.Node, + executionClient execution.ExecutionClient, + executionSequencer execution.ExecutionSequencer, + executionRecorder execution.ExecutionRecorder, + executionBatchPoster execution.ExecutionBatchPoster, + arbDb ethdb.Database, + configFetcher ConfigFetcher, + l2Config *params.ChainConfig, + l1client *ethclient.Client, + deployInfo *chaininfo.RollupAddresses, + txOptsValidator *bind.TransactOpts, + txOptsBatchPoster *bind.TransactOpts, + dataSigner signature.DataSignerFunc, + fatalErrChan chan error, + parentChainID *big.Int, + blobReader daprovider.BlobReader, +) (*Node, error) { + if (executionClient == nil) || (executionSequencer == nil) || (executionRecorder == nil) || (executionBatchPoster == nil) { + return nil, errors.New("execution client, sequencer, recorder, and batch poster must be non-nil") + } + currentNode, err := createNodeImpl(ctx, stack, executionClient, executionSequencer, executionRecorder, executionBatchPoster, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader) + if err != nil { + return nil, err + } + registerAPIs(currentNode, stack) return currentNode, nil } func (n *Node) Start(ctx context.Context) error { - execClient, ok := n.Execution.(*gethexec.ExecutionNode) + execClient, ok := n.ExecutionClient.(*gethexec.ExecutionNode) if !ok { execClient = nil } @@ -914,7 +1255,7 @@ func (n *Node) Start(ctx context.Context) error { if execClient != nil { execClient.SetConsensusClient(n) } - err = n.Execution.Start(ctx) + _, err = n.ExecutionClient.Start(ctx).Await(ctx) if err != nil { return fmt.Errorf("error starting exec client: %w", err) } @@ -963,8 +1304,8 @@ func (n *Node) Start(ctx context.Context) error { } if n.SeqCoordinator != nil { n.SeqCoordinator.Start(ctx) - } else { - n.Execution.Activate() + } else if n.ExecutionSequencer != nil { + n.ExecutionSequencer.Activate() } if n.MaintenanceRunner != nil { n.MaintenanceRunner.Start(ctx) @@ -1084,8 +1425,11 @@ func (n *Node) StopAndWait() { if n.DASLifecycleManager != nil { n.DASLifecycleManager.StopAndWaitUntil(2 * time.Second) } - if n.Execution != nil { - n.Execution.StopAndWait() + if n.ExecutionClient != nil { + _, err := n.ExecutionClient.StopAndWait().Await(n.ctx) + if err != nil { + log.Error("error stopping execution client", "err", err) + } } if err := n.Stack.Close(); err != nil { log.Error("error on stack close", "err", err) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 1a961ebd3f..d845bcd477 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -44,7 +44,7 @@ type TransactionStreamer struct { stopwaiter.StopWaiter chainConfig *params.ChainConfig - exec execution.ExecutionSequencer + exec execution.ExecutionClient execLastMsgCount arbutil.MessageIndex validator *staker.BlockValidator @@ -98,7 +98,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { func NewTransactionStreamer( db ethdb.Database, chainConfig *params.ChainConfig, - exec execution.ExecutionSequencer, + exec execution.ExecutionClient, broadcastServer *broadcaster.Broadcaster, fatalErrChan chan<- error, config TransactionStreamerConfigFetcher, @@ -353,7 +353,7 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde s.reorgMutex.Lock() defer s.reorgMutex.Unlock() - messagesResults, err := s.exec.Reorg(count, newMessages, oldMessages) + messagesResults, err := s.exec.Reorg(count, newMessages, oldMessages).Await(s.GetContext()) if err != nil { return err } @@ -497,7 +497,7 @@ func (s *TransactionStreamer) GetProcessedMessageCount() (arbutil.MessageIndex, if err != nil { return 0, err } - digestedHead, err := s.exec.HeadMessageNumber() + digestedHead, err := s.exec.HeadMessageNumber().Await(s.GetContext()) if err != nil { return 0, err } @@ -674,7 +674,10 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m if messagesAreConfirmed { // Trim confirmed messages from l1pricedataCache - s.exec.MarkFeedStart(pos + arbutil.MessageIndex(len(messages))) + _, err := s.exec.MarkFeedStart(pos + arbutil.MessageIndex(len(messages))).Await(s.GetContext()) + if err != nil { + log.Warn("TransactionStreamer: failed to mark feed start", "pos", pos, "err", err) + } s.reorgMutex.RLock() dups, _, _, err := s.countDuplicateMessages(pos, messagesWithBlockHash, nil) s.reorgMutex.RUnlock() @@ -1090,7 +1093,11 @@ func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execut } log.Info(FailedToGetMsgResultFromDB, "count", count) - msgResult, err := s.exec.ResultAtPos(pos) + ctx := context.Background() + if s.Started() { + ctx = s.GetContext() + } + msgResult, err := s.exec.ResultAtPos(pos).Await(ctx) if err != nil { return nil, err } @@ -1154,7 +1161,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool { return false } s.execLastMsgCount = msgCount - pos, err := s.exec.HeadMessageNumber() + pos, err := s.exec.HeadMessageNumber().Await(ctx) if err != nil { log.Error("feedOneMsg failed to get exec engine message count", "err", err) return false @@ -1177,7 +1184,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool { } msgForPrefetch = msg } - msgResult, err := s.exec.DigestMessage(pos, &msgAndBlockHash.MessageWithMeta, msgForPrefetch) + msgResult, err := s.exec.DigestMessage(pos, &msgAndBlockHash.MessageWithMeta, msgForPrefetch).Await(ctx) if err != nil { logger := log.Warn if prevMessageCount < msgCount { diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index e4e1b79353..03cc811e16 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -534,10 +534,13 @@ func mainImpl() int { return 1 } - currentNode, err := arbnode.CreateNode( + currentNode, err := arbnode.CreateNodeFullExecutionClient( ctx, stack, execNode, + execNode, + execNode, + execNode, arbDb, &NodeConfigFetcher{liveNodeConfig}, l2BlockChain.Config(), diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 5030de0cfa..b937f9fdb9 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -29,6 +29,7 @@ import ( "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/solgen/go/precompilesgen" + "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/dbutil" "github.com/offchainlabs/nitro/util/headerreader" ) @@ -312,8 +313,12 @@ func CreateExecutionNode( } -func (n *ExecutionNode) MarkFeedStart(to arbutil.MessageIndex) { - n.ExecEngine.MarkFeedStart(to) +func (n *ExecutionNode) MarkFeedStart(to arbutil.MessageIndex) containers.PromiseInterface[struct{}] { + markFeedStartWithReturn := func(to arbutil.MessageIndex) (struct{}, error) { + n.ExecEngine.MarkFeedStart(to) + return struct{}{}, nil + } + return containers.NewReadyPromise(markFeedStartWithReturn(to)) } func (n *ExecutionNode) Initialize(ctx context.Context) error { @@ -339,9 +344,9 @@ func (n *ExecutionNode) Initialize(ctx context.Context) error { } // not thread safe -func (n *ExecutionNode) Start(ctx context.Context) error { +func (n *ExecutionNode) Start(ctx context.Context) containers.PromiseInterface[struct{}] { if n.started.Swap(true) { - return errors.New("already started") + return containers.NewReadyPromise(struct{}{}, errors.New("already started")) } // TODO after separation // err := n.Stack.Start() @@ -351,17 +356,17 @@ func (n *ExecutionNode) Start(ctx context.Context) error { n.ExecEngine.Start(ctx) err := n.TxPublisher.Start(ctx) if err != nil { - return fmt.Errorf("error starting transaction puiblisher: %w", err) + return containers.NewReadyPromise(struct{}{}, fmt.Errorf("error starting transaction puiblisher: %w", err)) } if n.ParentChainReader != nil { n.ParentChainReader.Start(ctx) } - return nil + return containers.NewReadyPromise(struct{}{}, nil) } -func (n *ExecutionNode) StopAndWait() { +func (n *ExecutionNode) StopAndWait() containers.PromiseInterface[struct{}] { if !n.started.Load() { - return + return containers.NewReadyPromise(struct{}{}, nil) } // TODO after separation // n.Stack.StopRPC() // does nothing if not running @@ -383,19 +388,21 @@ func (n *ExecutionNode) StopAndWait() { // if err := n.Stack.Close(); err != nil { // log.Error("error on stak close", "err", err) // } + + return containers.NewReadyPromise(struct{}{}, nil) } -func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*execution.MessageResult, error) { - return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch) +func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.MessageResult] { + return containers.NewReadyPromise(n.ExecEngine.DigestMessage(num, msg, msgForPrefetch)) } -func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { - return n.ExecEngine.Reorg(count, newMessages, oldMessages) +func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*execution.MessageResult] { + return containers.NewReadyPromise(n.ExecEngine.Reorg(count, newMessages, oldMessages)) } -func (n *ExecutionNode) HeadMessageNumber() (arbutil.MessageIndex, error) { - return n.ExecEngine.HeadMessageNumber() +func (n *ExecutionNode) HeadMessageNumber() containers.PromiseInterface[arbutil.MessageIndex] { + return containers.NewReadyPromise(n.ExecEngine.HeadMessageNumber()) } -func (n *ExecutionNode) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) { - return n.ExecEngine.HeadMessageNumberSync(t) +func (n *ExecutionNode) HeadMessageNumberSync(t *testing.T) containers.PromiseInterface[arbutil.MessageIndex] { + return containers.NewReadyPromise(n.ExecEngine.HeadMessageNumberSync(t)) } func (n *ExecutionNode) NextDelayedMessageNumber() (uint64, error) { return n.ExecEngine.NextDelayedMessageNumber() @@ -403,8 +410,8 @@ func (n *ExecutionNode) NextDelayedMessageNumber() (uint64, error) { func (n *ExecutionNode) SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error { return n.ExecEngine.SequenceDelayedMessage(message, delayedSeqNum) } -func (n *ExecutionNode) ResultAtPos(pos arbutil.MessageIndex) (*execution.MessageResult, error) { - return n.ExecEngine.ResultAtPos(pos) +func (n *ExecutionNode) ResultAtPos(pos arbutil.MessageIndex) containers.PromiseInterface[*execution.MessageResult] { + return containers.NewReadyPromise(n.ExecEngine.ResultAtPos(pos)) } func (n *ExecutionNode) ArbOSVersionForMessageNumber(messageNum arbutil.MessageIndex) (uint64, error) { return n.ExecEngine.ArbOSVersionForMessageNumber(messageNum) @@ -453,8 +460,12 @@ func (n *ExecutionNode) MessageIndexToBlockNumber(messageNum arbutil.MessageInde return n.ExecEngine.MessageIndexToBlockNumber(messageNum) } -func (n *ExecutionNode) Maintenance() error { - return n.ChainDB.Compact(nil, nil) +func (n *ExecutionNode) Maintenance() containers.PromiseInterface[struct{}] { + compactWithReturn := func() (struct{}, error) { + err := n.ChainDB.Compact(nil, nil) + return struct{}{}, err + } + return containers.NewReadyPromise(compactWithReturn()) } func (n *ExecutionNode) Synced() bool { diff --git a/execution/interface.go b/execution/interface.go index c0aa71c146..dcf6fec72a 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -3,13 +3,13 @@ package execution import ( "context" "errors" - "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/util/containers" ) type MessageResult struct { @@ -29,11 +29,16 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { - DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*MessageResult, error) - Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error) - HeadMessageNumber() (arbutil.MessageIndex, error) - HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) - ResultAtPos(pos arbutil.MessageIndex) (*MessageResult, error) + DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*MessageResult] + Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*MessageResult] + HeadMessageNumber() containers.PromiseInterface[arbutil.MessageIndex] + ResultAtPos(pos arbutil.MessageIndex) containers.PromiseInterface[*MessageResult] + MarkFeedStart(to arbutil.MessageIndex) containers.PromiseInterface[struct{}] + + Maintenance() containers.PromiseInterface[struct{}] + + Start(ctx context.Context) containers.PromiseInterface[struct{}] + StopAndWait() containers.PromiseInterface[struct{}] } // needed for validators / stakers @@ -55,21 +60,12 @@ type ExecutionSequencer interface { ForwardTo(url string) error SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error NextDelayedMessageNumber() (uint64, error) - MarkFeedStart(to arbutil.MessageIndex) Synced() bool FullSyncProgressMap() map[string]interface{} } -type FullExecutionClient interface { - ExecutionClient - ExecutionRecorder - ExecutionSequencer - - Start(ctx context.Context) error - StopAndWait() - - Maintenance() error - +// needed for batch poster +type ExecutionBatchPoster interface { ArbOSVersionForMessageNumber(messageNum arbutil.MessageIndex) (uint64, error) } diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index d6ae4973ac..7dd2d50daf 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -247,7 +247,7 @@ func testBlockValidatorSimple(t *testing.T, opts Options) { if !testClientB.ConsensusNode.BlockValidator.WaitForPos(t, ctx, arbutil.MessageIndex(lastBlock.NumberU64()), timeout) { Fatal(t, "did not validate all blocks") } - gethExec, ok := testClientB.ConsensusNode.Execution.(*gethexec.ExecutionNode) + gethExec, ok := testClientB.ConsensusNode.ExecutionClient.(*gethexec.ExecutionNode) if !ok { t.Fail() } diff --git a/system_tests/bold_challenge_protocol_test.go b/system_tests/bold_challenge_protocol_test.go index 83700fc838..bbc0bde2fe 100644 --- a/system_tests/bold_challenge_protocol_test.go +++ b/system_tests/bold_challenge_protocol_test.go @@ -138,9 +138,9 @@ func testChallengeProtocolBOLD(t *testing.T, spawnerOpts ...server_arb.SpawnerOp ) defer l2nodeB.StopAndWait() - genesisA, err := l2nodeA.Execution.ResultAtPos(0) + genesisA, err := l2nodeA.ExecutionClient.ResultAtPos(0).Await(ctx) Require(t, err) - genesisB, err := l2nodeB.Execution.ResultAtPos(0) + genesisB, err := l2nodeB.ExecutionClient.ResultAtPos(0).Await(ctx) Require(t, err) if genesisA.BlockHash != genesisB.BlockHash { Fatal(t, "genesis blocks mismatch between nodes") @@ -160,7 +160,7 @@ func testChallengeProtocolBOLD(t *testing.T, spawnerOpts ...server_arb.SpawnerOp l2nodeA.InboxReader, l2nodeA.InboxTracker, l2nodeA.TxStreamer, - l2nodeA.Execution, + l2nodeA.ExecutionRecorder, l2nodeA.ArbDB, nil, StaticFetcherFrom(t, &blockValidatorConfig), @@ -175,7 +175,7 @@ func testChallengeProtocolBOLD(t *testing.T, spawnerOpts ...server_arb.SpawnerOp l2nodeB.InboxReader, l2nodeB.InboxTracker, l2nodeB.TxStreamer, - l2nodeB.Execution, + l2nodeB.ExecutionRecorder, l2nodeB.ArbDB, nil, StaticFetcherFrom(t, &blockValidatorConfig), @@ -331,11 +331,11 @@ func testChallengeProtocolBOLD(t *testing.T, spawnerOpts ...server_arb.SpawnerOp t.Logf("Node B batch count %d, msgs %d", bcB, msgB) // Wait for both nodes' chains to catch up. - nodeAExec, ok := l2nodeA.Execution.(*gethexec.ExecutionNode) + nodeAExec, ok := l2nodeA.ExecutionClient.(*gethexec.ExecutionNode) if !ok { Fatal(t, "not geth execution node") } - nodeBExec, ok := l2nodeB.Execution.(*gethexec.ExecutionNode) + nodeBExec, ok := l2nodeB.ExecutionClient.(*gethexec.ExecutionNode) if !ok { Fatal(t, "not geth execution node") } @@ -617,8 +617,8 @@ func createTestNodeOnL1ForBoldProtocol( parentChainId, err := l1client.ChainID(ctx) Require(t, err) - currentNode, err = arbnode.CreateNode( - ctx, l2stack, execNode, l2arbDb, NewFetcherFromConfig(nodeConfig), l2blockchain.Config(), l1client, + currentNode, err = arbnode.CreateNodeFullExecutionClient( + ctx, l2stack, execNode, execNode, execNode, execNode, l2arbDb, NewFetcherFromConfig(nodeConfig), l2blockchain.Config(), l1client, addresses, sequencerTxOptsPtr, sequencerTxOptsPtr, dataSigner, fatalErrChan, parentChainId, nil, // Blob reader. ) @@ -778,7 +778,7 @@ func create2ndNodeWithConfigForBoldProtocol( fatalErrChan := make(chan error, 10) l1rpcClient := l1stack.Attach() l1client := ethclient.NewClient(l1rpcClient) - firstExec, ok := first.Execution.(*gethexec.ExecutionNode) + firstExec, ok := first.ExecutionClient.(*gethexec.ExecutionNode) if !ok { Fatal(t, "not geth execution node") } @@ -827,7 +827,7 @@ func create2ndNodeWithConfigForBoldProtocol( Require(t, err) l1ChainId, err := l1client.ChainID(ctx) Require(t, err) - l2node, err := arbnode.CreateNode(ctx, l2stack, execNode, l2arbDb, NewFetcherFromConfig(nodeConfig), l2blockchain.Config(), l1client, addresses, &txOpts, &txOpts, dataSigner, fatalErrChan, l1ChainId, nil /* blob reader */) + l2node, err := arbnode.CreateNodeFullExecutionClient(ctx, l2stack, execNode, execNode, execNode, execNode, l2arbDb, NewFetcherFromConfig(nodeConfig), l2blockchain.Config(), l1client, addresses, &txOpts, &txOpts, dataSigner, fatalErrChan, l1ChainId, nil /* blob reader */) Require(t, err) l2client := ClientForStack(t, l2stack) diff --git a/system_tests/bold_state_provider_test.go b/system_tests/bold_state_provider_test.go index 0ecce5ba64..cd137ea5b2 100644 --- a/system_tests/bold_state_provider_test.go +++ b/system_tests/bold_state_provider_test.go @@ -380,7 +380,7 @@ func setupBoldStateProvider(t *testing.T, ctx context.Context, blockChallengeHei l2node.InboxReader, l2node.InboxTracker, l2node.TxStreamer, - l2node.Execution, + l2node.ExecutionRecorder, l2node.ArbDB, nil, StaticFetcherFrom(t, &blockValidatorConfig), diff --git a/system_tests/common_test.go b/system_tests/common_test.go index d3d4b33ab9..faa1c088f7 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -92,13 +92,14 @@ import ( type info = *BlockchainTestInfo type SecondNodeParams struct { - nodeConfig *arbnode.Config - execConfig *gethexec.Config - stackConfig *node.Config - dasConfig *das.DataAvailabilityConfig - initData *statetransfer.ArbosInitializationInfo - addresses *chaininfo.RollupAddresses - wasmCacheTag uint32 + nodeConfig *arbnode.Config + execConfig *gethexec.Config + stackConfig *node.Config + dasConfig *das.DataAvailabilityConfig + initData *statetransfer.ArbosInitializationInfo + addresses *chaininfo.RollupAddresses + wasmCacheTag uint32 + useExecutionClientOnly bool } type TestClient struct { @@ -502,8 +503,8 @@ func buildOnParentChain( Require(t, err) fatalErrChan := make(chan error, 10) - chainTestClient.ConsensusNode, err = arbnode.CreateNode( - ctx, chainTestClient.Stack, execNode, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainTestClient.Client, + chainTestClient.ConsensusNode, err = arbnode.CreateNodeFullExecutionClient( + ctx, chainTestClient.Stack, execNode, execNode, execNode, execNode, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainTestClient.Client, addresses, validatorTxOptsPtr, sequencerTxOptsPtr, dataSigner, fatalErrChan, parentChainId, nil) Require(t, err) @@ -624,8 +625,8 @@ func (b *NodeBuilder) BuildL2(t *testing.T) func() { Require(t, err) fatalErrChan := make(chan error, 10) - b.L2.ConsensusNode, err = arbnode.CreateNode( - b.ctx, b.L2.Stack, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), + b.L2.ConsensusNode, err = arbnode.CreateNodeFullExecutionClient( + b.ctx, b.L2.Stack, execNode, execNode, execNode, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), nil, nil, nil, nil, nil, fatalErrChan, big.NewInt(1337), nil) Require(t, err) @@ -673,7 +674,7 @@ func (b *NodeBuilder) RestartL2Node(t *testing.T) { Require(t, err) feedErrChan := make(chan error, 10) - currentNode, err := arbnode.CreateNode(b.ctx, stack, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), nil, nil, nil, nil, nil, feedErrChan, big.NewInt(1337), nil) + currentNode, err := arbnode.CreateNodeFullExecutionClient(b.ctx, stack, execNode, execNode, execNode, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), nil, nil, nil, nil, nil, feedErrChan, big.NewInt(1337), nil) Require(t, err) Require(t, currentNode.Start(b.ctx)) @@ -743,7 +744,7 @@ func build2ndNode( testClient := NewTestClient(ctx) testClient.Client, testClient.ConsensusNode = - Create2ndNodeWithConfig(t, ctx, firstNodeTestClient.ConsensusNode, parentChainTestClient.Stack, parentChainInfo, params.initData, params.nodeConfig, params.execConfig, params.stackConfig, valnodeConfig, params.addresses, initMessage, params.wasmCacheTag) + Create2ndNodeWithConfig(t, ctx, firstNodeTestClient.ConsensusNode, parentChainTestClient.Stack, parentChainInfo, params.initData, params.nodeConfig, params.execConfig, params.stackConfig, valnodeConfig, params.addresses, initMessage, params.wasmCacheTag, params.useExecutionClientOnly) testClient.ExecNode = getExecNode(t, testClient.ConsensusNode) testClient.cleanup = func() { testClient.ConsensusNode.StopAndWait() } return testClient, func() { testClient.cleanup() } @@ -1509,6 +1510,7 @@ func Create2ndNodeWithConfig( addresses *chaininfo.RollupAddresses, initMessage *arbostypes.ParsedInitMessage, wasmCacheTag uint32, + useExecutionClientOnly bool, ) (*ethclient.Client, *arbnode.Node) { if nodeConfig == nil { nodeConfig = arbnode.ConfigDefaultL1NonSequencerTest() @@ -1556,7 +1558,13 @@ func Create2ndNodeWithConfig( currentExec, err := gethexec.CreateExecutionNode(ctx, chainStack, chainDb, blockchain, parentChainClient, configFetcher) Require(t, err) - currentNode, err := arbnode.CreateNode(ctx, chainStack, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil) + var currentNode *arbnode.Node + if useExecutionClientOnly { + currentNode, err = arbnode.CreateNodeExecutionClient(ctx, chainStack, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil) + } else { + currentNode, err = arbnode.CreateNodeFullExecutionClient(ctx, chainStack, currentExec, currentExec, currentExec, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil) + } + Require(t, err) err = currentNode.Start(ctx) @@ -1808,7 +1816,7 @@ func TestMain(m *testing.M) { func getExecNode(t *testing.T, node *arbnode.Node) *gethexec.ExecutionNode { t.Helper() - gethExec, ok := node.Execution.(*gethexec.ExecutionNode) + gethExec, ok := node.ExecutionClient.(*gethexec.ExecutionNode) if !ok { t.Fatal("failed to get exec node from arbnode") } diff --git a/system_tests/execution_client_only_test.go b/system_tests/execution_client_only_test.go new file mode 100644 index 0000000000..22c76b868f --- /dev/null +++ b/system_tests/execution_client_only_test.go @@ -0,0 +1,46 @@ +// Copyright 2021-2024, Offchain Labs, Inc. +// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE + +package arbtest + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/offchainlabs/nitro/arbnode" +) + +func TestExecutionClientOnly(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + cleanup := builder.Build(t) + defer cleanup() + seqTestClient := builder.L2 + + replicaExecutionClientOnlyConfig := arbnode.ConfigDefaultL1NonSequencerTest() + replicaExecutionClientOnlyTestClient, replicaExecutionClientOnlyCleanup := builder.Build2ndNode(t, &SecondNodeParams{nodeConfig: replicaExecutionClientOnlyConfig, useExecutionClientOnly: true}) + defer replicaExecutionClientOnlyCleanup() + + builder.L2Info.GenerateAccount("User2") + for i := 0; i < 3; i++ { + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, big.NewInt(1e12), nil) + err := seqTestClient.Client.SendTransaction(ctx, tx) + Require(t, err) + _, err = seqTestClient.EnsureTxSucceeded(tx) + Require(t, err) + _, err = WaitForTx(ctx, replicaExecutionClientOnlyTestClient.Client, tx.Hash(), time.Second*15) + Require(t, err) + } + + replicaBalance, err := replicaExecutionClientOnlyTestClient.Client.BalanceAt(ctx, builder.L2Info.GetAddress("User2"), nil) + Require(t, err) + if replicaBalance.Cmp(big.NewInt(3e12)) != 0 { + t.Fatal("Unexpected balance:", replicaBalance) + } +} diff --git a/system_tests/meaningless_reorg_test.go b/system_tests/meaningless_reorg_test.go index 350b21a6cf..47cbcad3d7 100644 --- a/system_tests/meaningless_reorg_test.go +++ b/system_tests/meaningless_reorg_test.go @@ -63,7 +63,7 @@ func TestMeaninglessBatchReorg(t *testing.T) { builder.L1.TransferBalance(t, "Faucet", "Faucet", common.Big1, builder.L1Info) } - compareAllMsgResultsFromConsensusAndExecution(t, builder.L2, "before reorg") + compareAllMsgResultsFromConsensusAndExecution(t, ctx, builder.L2, "before reorg") parentBlock := builder.L1.L1Backend.BlockChain().GetBlockByNumber(batchReceipt.BlockNumber.Uint64() - 1) err = builder.L1.L1Backend.BlockChain().ReorgToOldBlock(parentBlock) @@ -108,5 +108,5 @@ func TestMeaninglessBatchReorg(t *testing.T) { Fatal(t, "L2 block hash changed") } - compareAllMsgResultsFromConsensusAndExecution(t, builder.L2, "after reorg") + compareAllMsgResultsFromConsensusAndExecution(t, ctx, builder.L2, "after reorg") } diff --git a/system_tests/overflow_assertions_test.go b/system_tests/overflow_assertions_test.go index eb2bb01470..3a1cdc9e67 100644 --- a/system_tests/overflow_assertions_test.go +++ b/system_tests/overflow_assertions_test.go @@ -94,7 +94,7 @@ func TestOverflowAssertions(t *testing.T) { l2node.InboxReader, l2node.InboxTracker, l2node.TxStreamer, - l2node.Execution, + l2node.ExecutionRecorder, l2node.ArbDB, nil, StaticFetcherFrom(t, &blockValidatorConfig), @@ -173,7 +173,7 @@ func TestOverflowAssertions(t *testing.T) { t.Logf("Node batch count %d, msgs %d", bc, msgs) // Wait for the node to catch up. - nodeExec, ok := l2node.Execution.(*gethexec.ExecutionNode) + nodeExec, ok := l2node.ExecutionClient.(*gethexec.ExecutionNode) if !ok { Fatal(t, "not geth execution node") } diff --git a/system_tests/reorg_resequencing_test.go b/system_tests/reorg_resequencing_test.go index 70ab63bec1..42080222a2 100644 --- a/system_tests/reorg_resequencing_test.go +++ b/system_tests/reorg_resequencing_test.go @@ -69,7 +69,7 @@ func TestReorgResequencing(t *testing.T) { Require(t, err) verifyBalances("after empty reorg") - compareAllMsgResultsFromConsensusAndExecution(t, builder.L2, "after empty reorg") + compareAllMsgResultsFromConsensusAndExecution(t, ctx, builder.L2, "after empty reorg") prevMessage, err := builder.L2.ConsensusNode.TxStreamer.GetMessage(startMsgCount - 1) Require(t, err) @@ -97,7 +97,7 @@ func TestReorgResequencing(t *testing.T) { accountsWithBalance = append(accountsWithBalance, "User4") verifyBalances("after reorg with new deposit") - compareAllMsgResultsFromConsensusAndExecution(t, builder.L2, "after reorg with new deposit") + compareAllMsgResultsFromConsensusAndExecution(t, ctx, builder.L2, "after reorg with new deposit") err = builder.L2.ConsensusNode.TxStreamer.ReorgTo(startMsgCount) Require(t, err) @@ -106,5 +106,5 @@ func TestReorgResequencing(t *testing.T) { Require(t, err) verifyBalances("after second empty reorg") - compareAllMsgResultsFromConsensusAndExecution(t, builder.L2, "after second empty reorg") + compareAllMsgResultsFromConsensusAndExecution(t, ctx, builder.L2, "after second empty reorg") } diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index b757291561..31cf573123 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -149,10 +149,11 @@ func TestRelayedSequencerFeed(t *testing.T) { func compareAllMsgResultsFromConsensusAndExecution( t *testing.T, + ctx context.Context, testClient *TestClient, testScenario string, ) *execution.MessageResult { - execHeadMsgNum, err := testClient.ExecNode.HeadMessageNumber() + execHeadMsgNum, err := testClient.ExecNode.HeadMessageNumber().Await(context.Background()) Require(t, err) consensusMsgCount, err := testClient.ConsensusNode.TxStreamer.GetMessageCount() Require(t, err) @@ -166,7 +167,7 @@ func compareAllMsgResultsFromConsensusAndExecution( var lastResult *execution.MessageResult for msgCount := arbutil.MessageIndex(1); msgCount <= consensusMsgCount; msgCount++ { pos := msgCount - 1 - resultExec, err := testClient.ExecNode.ResultAtPos(arbutil.MessageIndex(pos)) + resultExec, err := testClient.ExecNode.ResultAtPos(arbutil.MessageIndex(pos)).Await(ctx) Require(t, err) resultConsensus, err := testClient.ConsensusNode.TxStreamer.ResultAtCount(msgCount) @@ -267,7 +268,7 @@ func testLyingSequencer(t *testing.T, dasModeStr string) { t.Fatal("Unexpected balance:", l2balance) } - fraudResult := compareAllMsgResultsFromConsensusAndExecution(t, testClientB, "fraud") + fraudResult := compareAllMsgResultsFromConsensusAndExecution(t, ctx, testClientB, "fraud") // Send the real transaction to client A, will cause a reorg on nodeB err = l2clientA.SendTransaction(ctx, realTx) @@ -319,7 +320,7 @@ func testLyingSequencer(t *testing.T, dasModeStr string) { t.Fatal("Consensus relied on execution database to return the result") } // Consensus should update message result stored in its database after a reorg - realResult := compareAllMsgResultsFromConsensusAndExecution(t, testClientB, "real") + realResult := compareAllMsgResultsFromConsensusAndExecution(t, ctx, testClientB, "real") // Checks that results changed if reflect.DeepEqual(fraudResult, realResult) { t.Fatal("realResult and fraudResult are equal")