From 61a20c78254de161ac84164638b37a500716c97b Mon Sep 17 00:00:00 2001 From: Ng Wei Han <47109095+weiihann@users.noreply.github.com> Date: Mon, 30 Sep 2024 19:31:27 +0800 Subject: [PATCH] Add sync mode p2p CLI flag (#2186) * Add sync mode p2p CLI flag * Return struct instead of interface * Change flag usage string * Remove SnapServer interface * Revert makefile * Minor fixes * Update docs --- Makefile | 9 ++++-- cmd/juno/juno.go | 4 +++ node/node.go | 21 ++++++------- p2p/downloader.go | 74 ++++++++++++++++++++++++++++++++++++++++++++++ p2p/modes.go | 74 ++++++++++++++++++++++++++++++++++++++++++++++ p2p/p2p.go | 55 +++++++++++----------------------- p2p/p2p_test.go | 5 ++++ p2p/snap_server.go | 15 ++-------- p2p/snap_syncer.go | 58 ++++++++++++++++-------------------- p2p/sync.go | 52 ++++++++++++++++---------------- 10 files changed, 248 insertions(+), 119 deletions(-) create mode 100644 p2p/downloader.go create mode 100644 p2p/modes.go diff --git a/Makefile b/Makefile index 14efe4ad65..2497df5b26 100644 --- a/Makefile +++ b/Makefile @@ -115,7 +115,9 @@ feedernode: juno-cached --p2p-feeder-node \ --p2p-addr=/ip4/0.0.0.0/tcp/7777 \ --p2p-private-key="5f6cdc3aebcc74af494df054876100368ef6126e3a33fa65b90c765b381ffc37a0a63bbeeefab0740f24a6a38dabb513b9233254ad0020c721c23e69bc820089" \ - --metrics-port=9090 + --metrics-port=9090 \ + --pprof \ + --pprof-port=9095 node1: juno-cached # todo remove rm before merge @@ -129,7 +131,10 @@ node1: juno-cached --p2p-peers=/ip4/127.0.0.1/tcp/7777/p2p/12D3KooWLdURCjbp1D7hkXWk6ZVfcMDPtsNnPHuxoTcWXFtvrxGG \ --p2p-addr=/ip4/0.0.0.0/tcp/7778 \ --p2p-private-key="8aeffc26c3c371565dbe634c5248ae26f4fa5c33bc8f7328ac95e73fb94eaf263550f02449521f7cf64af17d248c5f170be46c06986a29803124c0819cb8fac3" \ - --metrics-port=9091 + --metrics-port=9091 \ + --pprof \ + --pprof-port=9096 \ + --p2p-sync-mode="snap" # --p2p-peers=/ip4/127.0.0.1/tcp/7778/p2p/12D3KooWDQVMmK6cQrfFcWUoFF8Ch5vYegfwiP5Do2SFC2NAXeBk \ diff --git a/cmd/juno/juno.go b/cmd/juno/juno.go index c61ee40529..75fb7d3fea 100644 --- a/cmd/juno/juno.go +++ b/cmd/juno/juno.go @@ -82,6 +82,7 @@ const ( callMaxStepsF = "rpc-call-max-steps" corsEnableF = "rpc-cors-enable" versionedConstantsFileF = "versioned-constants-file" + p2pSyncModeF = "p2p-sync-mode" defaultConfig = "" defaulHost = "localhost" @@ -119,6 +120,7 @@ const ( defaultGwTimeout = 5 * time.Second defaultCorsEnable = false defaultVersionedConstantsFile = "" + defaultP2pSyncMode = "full" configFlagUsage = "The YAML configuration file." logLevelFlagUsage = "Options: trace, debug, info, warn, error." @@ -152,6 +154,7 @@ const ( p2pFeederNodeUsage = "EXPERIMENTAL: Run juno as a feeder node which will only sync from feeder gateway and gossip the new" + " blocks to the network." p2pPrivateKeyUsage = "EXPERIMENTAL: Hexadecimal representation of a private key on the Ed25519 elliptic curve." + p2pSyncModeUsage = "EXPERIMENTAL: Synchronization mode: 'full' (default), 'snap'" metricsUsage = "Enables the Prometheus metrics endpoint on the default port." metricsHostUsage = "The interface on which the Prometheus endpoint will listen for requests." metricsPortUsage = "The port on which the Prometheus endpoint will listen for requests." @@ -335,6 +338,7 @@ func NewCmd(config *node.Config, run func(*cobra.Command, []string) error) *cobr junoCmd.Flags().String(p2pPeersF, defaultP2pPeers, p2pPeersUsage) junoCmd.Flags().Bool(p2pFeederNodeF, defaultP2pFeederNode, p2pFeederNodeUsage) junoCmd.Flags().String(p2pPrivateKey, defaultP2pPrivateKey, p2pPrivateKeyUsage) + junoCmd.Flags().String(p2pSyncModeF, defaultP2pSyncMode, p2pSyncModeUsage) junoCmd.Flags().Bool(metricsF, defaultMetrics, metricsUsage) junoCmd.Flags().String(metricsHostF, defaulHost, metricsHostUsage) junoCmd.Flags().Uint16(metricsPortF, defaultMetricsPort, metricsPortUsage) diff --git a/node/node.go b/node/node.go index eae0c9a5e9..a770c01f79 100644 --- a/node/node.go +++ b/node/node.go @@ -71,12 +71,13 @@ type Config struct { MetricsHost string `mapstructure:"metrics-host"` MetricsPort uint16 `mapstructure:"metrics-port"` - P2P bool `mapstructure:"p2p"` - P2PAddr string `mapstructure:"p2p-addr"` - P2PPublicAddr string `mapstructure:"p2p-public-addr"` - P2PPeers string `mapstructure:"p2p-peers"` - P2PFeederNode bool `mapstructure:"p2p-feeder-node"` - P2PPrivateKey string `mapstructure:"p2p-private-key"` + P2P bool `mapstructure:"p2p"` + P2PAddr string `mapstructure:"p2p-addr"` + P2PPublicAddr string `mapstructure:"p2p-public-addr"` + P2PPeers string `mapstructure:"p2p-peers"` + P2PFeederNode bool `mapstructure:"p2p-feeder-node"` + P2PPrivateKey string `mapstructure:"p2p-private-key"` + P2PSyncMode p2p.SyncMode `mapstructure:"p2p-sync-mode"` MaxVMs uint `mapstructure:"max-vms"` MaxVMQueue uint `mapstructure:"max-vm-queue"` @@ -127,7 +128,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen chain := blockchain.New(database, &cfg.Network) - //TODO: close a blockchain? better way? + // TODO: close a blockchain? better way? services = append(services, blockchain.NewBlockchainCloser(chain, log)) // Verify that cfg.Network is compatible with the database. @@ -170,12 +171,12 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen // Do not start the feeder synchronisation synchronizer = nil } - if os.Getenv("JUNO_P2P_NO_SYNC") != "" { + if os.Getenv("JUNO_P2P_NO_SYNC") != "" { // TODO(weiihann): remove this in the future log.Warnw("Got 'JUNO_P2P_NO_SYNC' to not syncing from p2p network") synchronizer = nil } p2pService, err = p2p.New(cfg.P2PAddr, cfg.P2PPublicAddr, version, cfg.P2PPeers, cfg.P2PPrivateKey, cfg.P2PFeederNode, - chain, &cfg.Network, log, database) + cfg.P2PSyncMode, chain, &cfg.Network, log, database) if err != nil { return nil, fmt.Errorf("set up p2p service: %w", err) } @@ -372,7 +373,7 @@ func (n *Node) Run(ctx context.Context) { } <-ctx.Done() - //TODO: chain.Close() - which service should do this? + // TODO: chain.Close() - which service should do this? n.log.Infow("Shutting down Juno...") } diff --git a/p2p/downloader.go b/p2p/downloader.go new file mode 100644 index 0000000000..8f61d5b354 --- /dev/null +++ b/p2p/downloader.go @@ -0,0 +1,74 @@ +package p2p + +import ( + "context" + "os" + "sync/atomic" + + "github.com/NethermindEth/juno/blockchain" + "github.com/NethermindEth/juno/sync" + "github.com/NethermindEth/juno/utils" + "github.com/libp2p/go-libp2p/core/host" +) + +type Downloader struct { + isFeeder bool + mode atomic.Uint32 + baseSyncer *SyncService + snapSyncer *SnapSyncer + log utils.SimpleLogger +} + +func NewDownloader(isFeeder bool, syncMode SyncMode, p2pHost host.Host, network *utils.Network, bc *blockchain.Blockchain, log utils.SimpleLogger) *Downloader { + dl := &Downloader{ + isFeeder: isFeeder, + log: log, + } + + dl.baseSyncer = newSyncService(bc, p2pHost, network, log) + + var snapSyncer *SnapSyncer + if syncMode == SnapSync { + snapSyncer = NewSnapSyncer(dl.baseSyncer.Client(), bc, log) + } + dl.snapSyncer = snapSyncer + + // TODO: when syncing becomes more mature, we need a way to dynamically determine which sync mode to use + // For now, we will use the sync mode that is passed in the constructor + dl.mode.Store(uint32(syncMode)) + + return dl +} + +func (d *Downloader) Start(ctx context.Context) error { + // Feeder node doesn't sync using P2P + if d.isFeeder { + return nil + } + + d.log.Infow("Downloader start", "mode", d.getMode()) + if d.getMode() == SnapSync { + // TODO: a hack, remove this + if os.Getenv("JUNO_P2P_NO_SYNC") == "" { + err := d.snapSyncer.Run(ctx) + if err != nil { + d.log.Errorw("Snapsyncer failed to start") + return err + } + } else { + d.log.Infow("Syncing is disabled") + return nil + } + } + + d.baseSyncer.Start(ctx) + return nil +} + +func (d *Downloader) getMode() SyncMode { + return SyncMode(d.mode.Load()) +} + +func (d *Downloader) WithListener(l sync.EventListener) { + d.baseSyncer.WithListener(l) +} diff --git a/p2p/modes.go b/p2p/modes.go new file mode 100644 index 0000000000..ef77530adb --- /dev/null +++ b/p2p/modes.go @@ -0,0 +1,74 @@ +package p2p + +import ( + "encoding" + "fmt" + + "github.com/spf13/pflag" +) + +// The following are necessary for Cobra and Viper, respectively, to unmarshal +// CLI/config parameters properly. +var ( + _ pflag.Value = (*SyncMode)(nil) + _ encoding.TextUnmarshaler = (*SyncMode)(nil) +) + +// SyncMode represents the synchronisation mode of the downloader. +// It is a uint32 as it is used with atomic operations. +type SyncMode uint32 + +const ( + FullSync SyncMode = iota // Synchronize by downloading blocks and applying them to the chain sequentially + SnapSync // Download the chain and the state via snap protocol +) + +func (s SyncMode) IsValid() bool { + return s == FullSync || s == SnapSync +} + +func (s SyncMode) String() string { + switch s { + case FullSync: + return "full" + case SnapSync: + return "snap" + default: + return "unknown" + } +} + +func (s SyncMode) Type() string { + return "SyncMode" +} + +func (s SyncMode) MarshalYAML() (interface{}, error) { + return s.String(), nil +} + +func (s *SyncMode) Set(mode string) error { + switch mode { + case "full": + *s = FullSync + case "snap": + *s = SnapSync + default: + return fmt.Errorf("unknown sync mode %q, want \"full\" or \"snap\"", mode) + } + return nil +} + +func (s SyncMode) MarshalText() ([]byte, error) { + switch s { + case FullSync: + return []byte("full"), nil + case SnapSync: + return []byte("snap"), nil + default: + return nil, fmt.Errorf("unknown sync mode %d", s) + } +} + +func (s *SyncMode) UnmarshalText(text []byte) error { + return s.Set(string(text)) +} diff --git a/p2p/p2p.go b/p2p/p2p.go index 7706a06781..79a7d17577 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -5,9 +5,7 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/NethermindEth/juno/service" "math/rand" - "os" "strings" "sync" "time" @@ -50,15 +48,11 @@ type Service struct { topics map[string]*pubsub.Topic topicsLock sync.RWMutex - synchroniser *syncService - snapSyncher service.Service - //snapServer *snapServer - - feederNode bool + downloader *Downloader database db.DB } -func New(addr, publicAddr, version, peers, privKeyStr string, feederNode bool, bc *blockchain.Blockchain, snNetwork *utils.Network, +func New(addr, publicAddr, version, peers, privKeyStr string, feederNode bool, syncMode SyncMode, bc *blockchain.Blockchain, snNetwork *utils.Network, log utils.SimpleLogger, database db.DB, ) (*Service, error) { if addr == "" { @@ -116,10 +110,10 @@ func New(addr, publicAddr, version, peers, privKeyStr string, feederNode bool, b // Todo: try to understand what will happen if user passes a multiaddr with p2p public and a private key which doesn't match. // For example, a user passes the following multiaddr: --p2p-addr=/ip4/0.0.0.0/tcp/7778/p2p/(SomePublicKey) and also passes a // --p2p-private-key="SomePrivateKey". However, the private public key pair don't match, in this case what will happen? - return NewWithHost(p2pHost, peers, feederNode, bc, snNetwork, log, database) + return NewWithHost(p2pHost, peers, feederNode, syncMode, bc, snNetwork, log, database) } -func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchain.Blockchain, snNetwork *utils.Network, +func NewWithHost(p2phost host.Host, peers string, feederNode bool, syncMode SyncMode, bc *blockchain.Blockchain, snNetwork *utils.Network, log utils.SimpleLogger, database db.DB, ) (*Service, error) { var ( @@ -150,22 +144,19 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai return nil, err } - // todo: reconsider initialising synchroniser here because if node is a feedernode we should not create an instance of it. - - synchroniser := newSyncService(bc, p2phost, snNetwork, log) + downloader := NewDownloader(feederNode, syncMode, p2phost, snNetwork, bc, log) handler := starknet.NewHandler(bc, log) - handler.WithSnapsyncSupport(NewSnapServer(bc, log)) + handler.WithSnapsyncSupport(NewSnapServer(bc, log)) // TODO: initialize the snap server in the starknet handler + s := &Service{ - synchroniser: synchroniser, - snapSyncher: NewSnapSyncer(synchroniser, bc, log), - log: log, - host: p2phost, - network: snNetwork, - dht: p2pdht, - feederNode: feederNode, - topics: make(map[string]*pubsub.Topic), - handler: handler, - database: database, + downloader: downloader, + log: log, + host: p2phost, + network: snNetwork, + dht: p2pdht, + topics: make(map[string]*pubsub.Topic), + handler: handler, + database: database, } return s, nil } @@ -264,18 +255,8 @@ func (s *Service) Run(ctx context.Context) error { s.setProtocolHandlers() - if !s.feederNode { - //s.synchroniser.start(ctx) - if os.Getenv("JUNO_P2P_NO_SYNC") == "" { - err := s.snapSyncher.Run(ctx) - if err != nil { - s.log.Errorw("Snapsyncer failed to start") - return err - } - } else { - s.log.Infow("Syncing is disabled") - } - } + // Start the syncing process + s.downloader.Start(ctx) <-ctx.Done() if err := s.persistPeers(); err != nil { @@ -416,7 +397,7 @@ func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Strea func (s *Service) WithListener(l junoSync.EventListener) { runMetrics(s.host.Peerstore()) - s.synchroniser.WithListener(l) + s.downloader.WithListener(l) } // persistPeers stores the given peers in the peers database diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 070a9eedb8..0dd493027b 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -34,6 +34,7 @@ func TestService(t *testing.T) { peerHosts[0], "", false, + p2p.FullSync, nil, &utils.Integration, utils.NewNopZapLogger(), @@ -56,6 +57,7 @@ func TestService(t *testing.T) { peerHosts[1], strings.Join(peerAddrsString, ","), true, + p2p.FullSync, nil, &utils.Integration, utils.NewNopZapLogger(), @@ -144,6 +146,7 @@ func TestInvalidKey(t *testing.T) { "", "something", false, + p2p.FullSync, nil, &utils.Integration, utils.NewNopZapLogger(), @@ -162,6 +165,7 @@ func TestValidKey(t *testing.T) { "", "08011240333b4a433f16d7ca225c0e99d0d8c437b835cb74a98d9279c561977690c80f681b25ccf3fa45e2f2de260149c112fa516b69057dd3b0151a879416c0cb12d9b3", false, + p2p.FullSync, nil, &utils.Integration, utils.NewNopZapLogger(), @@ -199,6 +203,7 @@ func TestLoadAndPersistPeers(t *testing.T) { "", "5f6cdc3aebcc74af494df054876100368ef6126e3a33fa65b90c765b381ffc37a0a63bbeeefab0740f24a6a38dabb513b9233254ad0020c721c23e69bc820089", false, + p2p.FullSync, nil, &utils.Integration, utils.NewNopZapLogger(), diff --git a/p2p/snap_server.go b/p2p/snap_server.go index c9e3390b57..dc30fe684a 100644 --- a/p2p/snap_server.go +++ b/p2p/snap_server.go @@ -1,9 +1,10 @@ package p2p import ( + "math/big" + "github.com/NethermindEth/juno/utils" "google.golang.org/protobuf/proto" - "math/big" "github.com/NethermindEth/juno/adapters/core2p2p" "github.com/NethermindEth/juno/adapters/p2p2core" @@ -44,14 +45,6 @@ type ClassRangeStreamingResult struct { RangeProof *spec.PatriciaRangeProof } -// TODO: delete, duplicate of SnapProvider -type SnapServer interface { - GetClassRange(request *spec.ClassRangeRequest) (iter.Seq[proto.Message], error) - GetContractRange(request *spec.ContractRangeRequest) (iter.Seq[proto.Message], error) - GetStorageRange(request *spec.ContractStorageRequest) (iter.Seq[proto.Message], error) - GetClasses(request *spec.ClassHashesRequest) (iter.Seq[proto.Message], error) -} - type SnapServerBlockchain interface { GetStateForStateRoot(stateRoot *felt.Felt) (*core.State, error) GetClasses(felts []*felt.Felt) ([]core.Class, error) @@ -61,7 +54,7 @@ type yieldFunc = func(proto.Message) bool var _ SnapServerBlockchain = (*blockchain.Blockchain)(nil) -func NewSnapServer(blockchain SnapServerBlockchain, log utils.SimpleLogger) SnapServer { +func NewSnapServer(blockchain SnapServerBlockchain, log utils.SimpleLogger) *snapServer { return &snapServer{ log: log, blockchain: blockchain, @@ -246,7 +239,6 @@ func (b *snapServer) GetContractRange(request *spec.ContractRangeRequest) (iter. }) return nil }) - if err != nil { log.Error("error iterating storage trie", "err", err) return @@ -335,7 +327,6 @@ func (b *snapServer) GetStorageRange(request *spec.ContractStorageRequest) (iter } return true }) - if err != nil { log.Error("error handling storage range request", "err", err) return diff --git a/p2p/snap_syncer.go b/p2p/snap_syncer.go index 50810d6d34..6bfc607708 100644 --- a/p2p/snap_syncer.go +++ b/p2p/snap_syncer.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/NethermindEth/juno/p2p/starknet" big "math/big" "sync" "sync/atomic" @@ -17,6 +16,7 @@ import ( "github.com/NethermindEth/juno/core/crypto" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/p2p/starknet" "github.com/NethermindEth/juno/p2p/starknet/spec" "github.com/NethermindEth/juno/service" "github.com/NethermindEth/juno/starknetdata" @@ -38,8 +38,7 @@ type Blockchain interface { var _ Blockchain = (*blockchain.Blockchain)(nil) -type SnapSyncher struct { - baseSync *syncService +type SnapSyncer struct { starknetData starknetdata.StarknetData client *starknet.Client blockchain Blockchain @@ -65,7 +64,7 @@ type SnapSyncher struct { mtxL *sync.Mutex } -var _ service.Service = (*SnapSyncher)(nil) +var _ service.Service = (*SnapSyncer)(nil) type storageRangeJob struct { path *felt.Felt @@ -76,12 +75,12 @@ type storageRangeJob struct { } func NewSnapSyncer( - baseSyncher *syncService, + client *starknet.Client, bc *blockchain.Blockchain, log utils.SimpleLogger, -) *SnapSyncher { - return &SnapSyncher{ - baseSync: baseSyncher, +) *SnapSyncer { + return &SnapSyncer{ + client: client, blockchain: bc, log: log, } @@ -130,7 +129,7 @@ var ( newPivotHeadDistance = uint64(0) // This should be the reorg depth ) -func (s *SnapSyncher) Run(ctx context.Context) error { +func (s *SnapSyncer) Run(ctx context.Context) error { s.log.Infow("starting snap sync") // 1. Get the current head // 2. Start the snap sync with pivot set to that head @@ -143,11 +142,6 @@ func (s *SnapSyncher) Run(ctx context.Context) error { // 6. Probably download old state updato/bodies too // 7. Send back control to base sync. - // TODO: hacky client - if s.baseSync == nil { - panic("can't start snap syncer without base syncer") - } - s.client = s.baseSync.Client() s.starknetData = &MockStarkData{} err := s.runPhase1(ctx) @@ -164,11 +158,11 @@ func (s *SnapSyncher) Run(ctx context.Context) error { return nil // TODO: start p2p syncer - //s.baseSync.start(ctx) + // s.baseSync.start(ctx) } //nolint:gocyclo,nolintlint -func (s *SnapSyncher) runPhase1(ctx context.Context) error { +func (s *SnapSyncer) runPhase1(ctx context.Context) error { starttime := time.Now() err := s.initState(ctx) @@ -291,7 +285,7 @@ func (s *SnapSyncher) runPhase1(ctx context.Context) error { return nil } -func (s *SnapSyncher) PhraseVerify(ctx context.Context) error { +func (s *SnapSyncer) PhraseVerify(ctx context.Context) error { // 1. Get the correct tries roots (again) iter, err := s.client.RequestContractRange(ctx, &spec.ContractRangeRequest{ StateRoot: core2p2p.AdaptHash(s.currentGlobalStateRoot), @@ -348,7 +342,7 @@ func (s *SnapSyncher) PhraseVerify(ctx context.Context) error { return nil } -func (s *SnapSyncher) getNextStartingBlock(ctx context.Context) (*core.Block, error) { +func (s *SnapSyncer) getNextStartingBlock(ctx context.Context) (*core.Block, error) { for { select { case <-ctx.Done(): @@ -371,7 +365,7 @@ func (s *SnapSyncher) getNextStartingBlock(ctx context.Context) (*core.Block, er } } -func (s *SnapSyncher) initState(ctx context.Context) error { +func (s *SnapSyncer) initState(ctx context.Context) error { startingBlock, err := s.getNextStartingBlock(ctx) if err != nil { return errors.Join(err, errors.New("error getting current head")) @@ -410,7 +404,7 @@ func CalculatePercentage(f *felt.Felt) uint64 { } //nolint:gocyclo,nolintlint -func (s *SnapSyncher) runClassRangeWorker(ctx context.Context) error { +func (s *SnapSyncer) runClassRangeWorker(ctx context.Context) error { totalAdded := 0 completed := false startAddr := &felt.Zero @@ -542,7 +536,7 @@ func (s *SnapSyncher) runClassRangeWorker(ctx context.Context) error { } //nolint:gocyclo -func (s *SnapSyncher) runFetchClassWorker(ctx context.Context, workerIdx int) error { +func (s *SnapSyncer) runFetchClassWorker(ctx context.Context, workerIdx int) error { keyBatches := make([]*felt.Felt, 0) s.log.Infow("class fetch worker entering infinite loop", "worker", workerIdx) for { @@ -674,7 +668,7 @@ func (s *SnapSyncher) runFetchClassWorker(ctx context.Context, workerIdx int) er } //nolint:gocyclo -func (s *SnapSyncher) runContractRangeWorker(ctx context.Context) error { +func (s *SnapSyncer) runContractRangeWorker(ctx context.Context) error { totalAdded := 0 startAddr := &felt.Zero completed := false @@ -809,7 +803,7 @@ func (s *SnapSyncher) runContractRangeWorker(ctx context.Context) error { } //nolint:funlen,gocyclo -func (s *SnapSyncher) runStorageRangeWorker(ctx context.Context, workerIdx int) error { +func (s *SnapSyncer) runStorageRangeWorker(ctx context.Context, workerIdx int) error { nextjobs := make([]*storageRangeJob, 0) s.log.Infow("storage range worker entering infinite loop", "worker", workerIdx) for { @@ -838,7 +832,7 @@ func (s *SnapSyncher) runStorageRangeWorker(ctx context.Context, workerIdx int) } } - //s.log.Infow("storage range job completes batch", "jobs", len(jobs), "worker", workerIdx, "pending", s.storageRangeJobCount) + // s.log.Infow("storage range job completes batch", "jobs", len(jobs), "worker", workerIdx, "pending", s.storageRangeJobCount) requests := make([]*spec.StorageRangeQuery, 0) for _, job := range jobs { @@ -1024,7 +1018,7 @@ func (s *SnapSyncher) runStorageRangeWorker(ctx context.Context, workerIdx int) } //nolint:gocyclo -func (s *SnapSyncher) runStorageRefreshWorker(ctx context.Context) error { +func (s *SnapSyncer) runStorageRefreshWorker(ctx context.Context) error { // In ethereum, this is normally done with get tries, but since we don't have that here, we'll have to be // creative. This does mean that this is impressively inefficient. var job *storageRangeJob @@ -1142,7 +1136,7 @@ func (s *SnapSyncher) runStorageRefreshWorker(ctx context.Context) error { return nil } -func (s *SnapSyncher) queueClassJob(ctx context.Context, classHash *felt.Felt) error { +func (s *SnapSyncer) queueClassJob(ctx context.Context, classHash *felt.Felt) error { queued := false for !queued { select { @@ -1158,7 +1152,7 @@ func (s *SnapSyncher) queueClassJob(ctx context.Context, classHash *felt.Felt) e return nil } -func (s *SnapSyncher) queueStorageRangeJob(ctx context.Context, path, storageRoot, classHash *felt.Felt, nonce uint64) error { +func (s *SnapSyncer) queueStorageRangeJob(ctx context.Context, path, storageRoot, classHash *felt.Felt, nonce uint64) error { return s.queueStorageRangeJobJob(ctx, &storageRangeJob{ path: path, storageRoot: storageRoot, @@ -1168,7 +1162,7 @@ func (s *SnapSyncher) queueStorageRangeJob(ctx context.Context, path, storageRoo }) } -func (s *SnapSyncher) queueStorageRangeJobJob(ctx context.Context, job *storageRangeJob) error { +func (s *SnapSyncer) queueStorageRangeJobJob(ctx context.Context, job *storageRangeJob) error { if job.storageRoot == nil || job.storageRoot.IsZero() { // contract's with storage root of 0x0 has no storage return nil @@ -1189,7 +1183,7 @@ func (s *SnapSyncher) queueStorageRangeJobJob(ctx context.Context, job *storageR return nil } -func (s *SnapSyncher) queueStorageRefreshJob(ctx context.Context, job *storageRangeJob) error { +func (s *SnapSyncer) queueStorageRefreshJob(ctx context.Context, job *storageRangeJob) error { queued := false for !queued { select { @@ -1204,7 +1198,7 @@ func (s *SnapSyncher) queueStorageRefreshJob(ctx context.Context, job *storageRa return nil } -func (s *SnapSyncher) poolLatestBlock(ctx context.Context) error { +func (s *SnapSyncer) poolLatestBlock(ctx context.Context) error { for { select { case <-ctx.Done(): @@ -1237,7 +1231,7 @@ func (s *SnapSyncher) poolLatestBlock(ctx context.Context) error { } } -func (s *SnapSyncher) ApplyStateUpdate(blockNumber uint64) error { +func (s *SnapSyncer) ApplyStateUpdate(blockNumber uint64) error { return errors.New("unimplemented") } @@ -1271,7 +1265,7 @@ func P2pProofToTrieProofs(proof *spec.PatriciaRangeProof) []trie.ProofNode { } func VerifyGlobalStateRoot(globalStateRoot, classRoot, storageRoot *felt.Felt) error { - var stateVersion = new(felt.Felt).SetBytes([]byte(`STARKNET_STATE_V0`)) + stateVersion := new(felt.Felt).SetBytes([]byte(`STARKNET_STATE_V0`)) if classRoot.IsZero() { if globalStateRoot.Equal(storageRoot) { diff --git a/p2p/sync.go b/p2p/sync.go index 99468a8a31..c335adb227 100644 --- a/p2p/sync.go +++ b/p2p/sync.go @@ -26,7 +26,7 @@ import ( "go.uber.org/zap" ) -type syncService struct { +type SyncService struct { host host.Host network *utils.Network client *starknet.Client // todo: merge all the functionality of Client with p2p SyncService @@ -36,29 +36,29 @@ type syncService struct { log utils.SimpleLogger } -func newSyncService(bc *blockchain.Blockchain, h host.Host, n *utils.Network, log utils.SimpleLogger) *syncService { - return &syncService{ +func newSyncService(bc *blockchain.Blockchain, h host.Host, n *utils.Network, log utils.SimpleLogger) *SyncService { + s := &SyncService{ host: h, network: n, blockchain: bc, log: log, listener: &junoSync.SelectiveListener{}, } + + s.client = starknet.NewClient(s.randomPeerStream, s.network, s.log) + + return s } -// Client is a nasty hack to provide `CLient` to `SnapSyncher` -// TODO: clean this -func (s *syncService) Client() *starknet.Client { - return starknet.NewClient(s.randomPeerStream, s.network, s.log) +func (s *SyncService) Client() *starknet.Client { + return s.client } //nolint:funlen -func (s *syncService) start(ctx context.Context) { +func (s *SyncService) Start(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() - s.client = starknet.NewClient(s.randomPeerStream, s.network, s.log) - for i := 0; ; i++ { if err := ctx.Err(); err != nil { break @@ -149,7 +149,7 @@ func specBlockPartsFunc[T specBlockHeaderAndSigs | specTxWithReceipts | specEven return specBlockParts(i) } -func (s *syncService) logError(msg string, err error) { +func (s *SyncService) logError(msg string, err error) { if !errors.Is(err, context.Canceled) { var log utils.SimpleLogger if v, ok := s.log.(*utils.ZapLogger); ok { @@ -175,7 +175,7 @@ type blockBody struct { } //nolint:gocyclo -func (s *syncService) processSpecBlockParts( +func (s *SyncService) processSpecBlockParts( ctx context.Context, startingBlockNum uint64, specBlockPartsCh <-chan specBlockParts, ) <-chan <-chan blockBody { orderedBlockBodiesCh := make(chan (<-chan blockBody)) @@ -268,7 +268,7 @@ func (s *syncService) processSpecBlockParts( } //nolint:gocyclo,funlen -func (s *syncService) adaptAndSanityCheckBlock(ctx context.Context, header *spec.SignedBlockHeader, contractDiffs []*spec.ContractDiff, +func (s *SyncService) adaptAndSanityCheckBlock(ctx context.Context, header *spec.SignedBlockHeader, contractDiffs []*spec.ContractDiff, classes []*spec.Class, txs []*spec.Transaction, receipts []*spec.Receipt, events []*spec.Event, prevBlockRoot *felt.Felt, ) <-chan blockBody { bodyCh := make(chan blockBody) @@ -404,7 +404,7 @@ func (s specBlockHeaderAndSigs) blockNumber() uint64 { return s.header.Number } -func (s *syncService) genHeadersAndSigs(ctx context.Context, blockNumber uint64) (<-chan specBlockHeaderAndSigs, error) { +func (s *SyncService) genHeadersAndSigs(ctx context.Context, blockNumber uint64) (<-chan specBlockHeaderAndSigs, error) { it := s.createIteratorForBlock(blockNumber) headersIt, err := s.client.RequestBlockHeaders(ctx, &spec.BlockHeadersRequest{Iteration: it}) if err != nil { @@ -449,7 +449,7 @@ func (s specClasses) blockNumber() uint64 { return s.number } -func (s *syncService) genClasses(ctx context.Context, blockNumber uint64) (<-chan specClasses, error) { +func (s *SyncService) genClasses(ctx context.Context, blockNumber uint64) (<-chan specClasses, error) { it := s.createIteratorForBlock(blockNumber) classesIt, err := s.client.RequestClasses(ctx, &spec.ClassesRequest{Iteration: it}) if err != nil { @@ -495,7 +495,7 @@ func (s specContractDiffs) blockNumber() uint64 { return s.number } -func (s *syncService) genStateDiffs(ctx context.Context, blockNumber uint64) (<-chan specContractDiffs, error) { +func (s *SyncService) genStateDiffs(ctx context.Context, blockNumber uint64) (<-chan specContractDiffs, error) { it := s.createIteratorForBlock(blockNumber) stateDiffsIt, err := s.client.RequestStateDiffs(ctx, &spec.StateDiffsRequest{Iteration: it}) if err != nil { @@ -543,7 +543,7 @@ func (s specEvents) blockNumber() uint64 { return s.number } -func (s *syncService) genEvents(ctx context.Context, blockNumber uint64) (<-chan specEvents, error) { +func (s *SyncService) genEvents(ctx context.Context, blockNumber uint64) (<-chan specEvents, error) { it := s.createIteratorForBlock(blockNumber) eventsIt, err := s.client.RequestEvents(ctx, &spec.EventsRequest{Iteration: it}) if err != nil { @@ -589,7 +589,7 @@ func (s specTxWithReceipts) blockNumber() uint64 { return s.number } -func (s *syncService) genTransactions(ctx context.Context, blockNumber uint64) (<-chan specTxWithReceipts, error) { +func (s *SyncService) genTransactions(ctx context.Context, blockNumber uint64) (<-chan specTxWithReceipts, error) { it := s.createIteratorForBlock(blockNumber) txsIt, err := s.client.RequestTransactions(ctx, &spec.TransactionsRequest{Iteration: it}) if err != nil { @@ -634,7 +634,7 @@ func (s *syncService) genTransactions(ctx context.Context, blockNumber uint64) ( return txsCh, nil } -func (s *syncService) randomPeer() peer.ID { +func (s *SyncService) randomPeer() peer.ID { peers := s.host.Peerstore().Peers() // todo do not request same block from all peers @@ -647,15 +647,15 @@ func (s *syncService) randomPeer() peer.ID { p := peers[rand.Intn(len(peers))] //nolint:gosec - //s.log.Debugw("Number of peers", "len", len(peers)) - //s.log.Debugw("Random chosen peer's info", "peerInfo", s.host.Peerstore().PeerInfo(p)) + // s.log.Debugw("Number of peers", "len", len(peers)) + // s.log.Debugw("Random chosen peer's info", "peerInfo", s.host.Peerstore().PeerInfo(p)) return p } var errNoPeers = errors.New("no peers available") -func (s *syncService) randomPeerStream(ctx context.Context, pids ...protocol.ID) (network.Stream, error) { +func (s *SyncService) randomPeerStream(ctx context.Context, pids ...protocol.ID) (network.Stream, error) { randPeer := s.randomPeer() if randPeer == "" { return nil, errNoPeers @@ -669,13 +669,13 @@ func (s *syncService) randomPeerStream(ctx context.Context, pids ...protocol.ID) return stream, err } -func (s *syncService) removePeer(id peer.ID) { +func (s *SyncService) removePeer(id peer.ID) { s.log.Debugw("Removing peer", "peerID", id) s.host.Peerstore().RemovePeer(id) s.host.Peerstore().ClearAddrs(id) } -func (s *syncService) createIteratorForBlock(blockNumber uint64) *spec.Iteration { +func (s *SyncService) createIteratorForBlock(blockNumber uint64) *spec.Iteration { return &spec.Iteration{ Start: &spec.Iteration_BlockNumber{BlockNumber: blockNumber}, Direction: spec.Iteration_Forward, @@ -684,12 +684,12 @@ func (s *syncService) createIteratorForBlock(blockNumber uint64) *spec.Iteration } } -func (s *syncService) WithListener(l junoSync.EventListener) { +func (s *SyncService) WithListener(l junoSync.EventListener) { s.listener = l } //nolint:unused -func (s *syncService) sleep(d time.Duration) { +func (s *SyncService) sleep(d time.Duration) { s.log.Debugw("Sleeping...", "for", d) time.Sleep(d) }