Skip to content

Commit

Permalink
Netmap with nodev2 (#3088)
Browse files Browse the repository at this point in the history
Fixes #3047, fixes #1914.
  • Loading branch information
roman-khimov authored Feb 6, 2025
2 parents 5279df2 + 74d8031 commit 3e0d534
Show file tree
Hide file tree
Showing 36 changed files with 558 additions and 234 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Changelog for NeoFS Node
### Changed
- Number of cuncurrenly handled notifications from the chain was increased from 10 to 300 for IR (#3068)
- Write-cache size estimations (#3106)
- New network map support solving the limit of ~320 nodes per network

### Removed
- Drop creating new eacl tables with public keys (#3096)
Expand All @@ -43,6 +44,13 @@ to the BoltDB part of the write-cache, which is dropped from the code.
Also, because of this, there will be automatic migration from BoltDB by flushing
objects to the main storage and removing database file.

This version maintains two network map lists, the old one is used by default
and the new one will be used once "UseNodeV2" network-wide setting is set to
non-zero value. Storage nodes add their records to both lists by default, so
IR nodes must be updated first, otherwise SNs will fail to bootstrap. Monitor
candidates with neofs-adm and make a switch once all nodes are properly
migrated to the new list.

## [0.44.2] - 2024-12-20

### Fixed
Expand Down
81 changes: 70 additions & 11 deletions cmd/neofs-adm/internal/modules/fschain/netmap_candidates.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package fschain

import (
"encoding/hex"
"fmt"

"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
netmaprpc "github.com/nspcc-dev/neofs-contract/rpc/netmap"
"github.com/nspcc-dev/neofs-contract/rpc/nns"
"github.com/nspcc-dev/neofs-node/cmd/internal/cmdprinter"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
Expand All @@ -29,22 +31,79 @@ func listNetmapCandidatesNodes(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("can't get netmap contract hash: %w", err)
}

res, err := inv.Call(nmHash, "netmapCandidates")
useV2, _ := cmd.Flags().GetBool(nodeV2Flag)

if !useV2 {
res, err := inv.Call(nmHash, "netmapCandidates")
if err != nil {
return fmt.Errorf("can't fetch list of network config keys from the netmap contract: %w", err)
}
if res.State != "HALT" {
return fmt.Errorf("netmap contract returned unexpected exception: %s", res.FaultException)
}

nm, err := netmap.DecodeNetMap(res.Stack)

if err != nil {
return fmt.Errorf("unable to decode netmap: %w", err)
}
for i, n := range nm.Nodes() {
cmdprinter.PrettyPrintNodeInfo(cmd, n, i, "", false)
}
return nil
}
var (
nodes []netmaprpc.NetmapCandidate
reader = netmaprpc.NewReader(inv, nmHash)
)
sess, iter, err := reader.ListCandidates()
if err != nil {
return fmt.Errorf("can't fetch list of network config keys from the netmap contract: %w", err)
return fmt.Errorf("can't list candidates: %w", err)
}
if res.State != "HALT" {
return fmt.Errorf("netmap contract returned unexpected exception: %s", res.FaultException)
defer func() {
_ = inv.TerminateSession(sess)
}()
items, err := inv.TraverseIterator(sess, &iter, 0)
for err == nil && len(items) > 0 {
for _, itm := range items {
var (
c netmaprpc.NetmapCandidate
err = c.FromStackItem(itm)
)
if err != nil {
return fmt.Errorf("can't decode candidate: %w", err)
}
nodes = append(nodes, c)
}
items, err = inv.TraverseIterator(sess, &iter, 0)
}

nm, err := netmap.DecodeNetMap(res.Stack)

if err != nil {
return fmt.Errorf("unable to decode netmap: %w", err)
return fmt.Errorf("can't fetch candidates: %w", err)
}
nodes := nm.Nodes()
for i := range nodes {
cmdprinter.PrettyPrintNodeInfo(cmd, nodes[i], i, "", false)
for i, n := range nodes {
var strState string

switch {
case n.State.Cmp(netmaprpc.NodeStateOnline) == 0:
strState = "ONLINE"
case n.State.Cmp(netmaprpc.NodeStateOffline) == 0:
strState = "OFFLINE"
case n.State.Cmp(netmaprpc.NodeStateMaintenance) == 0:
strState = "MAINTENANCE"
default:
strState = "STATE_UNSUPPORTED"
}

cmd.Printf("Node %d: %s %s (last active: %d) ", i+1, hex.EncodeToString(n.Key.Bytes()), strState, n.LastActiveEpoch.Int64())

for j := range n.Addresses {
cmd.Printf("%s ", n.Addresses[j])
}
cmd.Println()

for k, v := range n.Attributes {
cmd.Printf("\t%s: %s\n", k, v)
}
}
return nil
}
3 changes: 2 additions & 1 deletion cmd/neofs-adm/internal/modules/fschain/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
estimationsContainerFlag = "cid"
mintNeofsAmountFlag = "amount"
mintTxHashFlag = "deposit-tx"
nodeV2Flag = "nodev2"
)

var (
Expand Down Expand Up @@ -326,7 +327,6 @@ Values for unknown keys are added exactly the way they're provided, no conversio
netmapCandidatesCmd = &cobra.Command{
Use: "netmap-candidates",
Short: "List netmap candidates nodes",
Args: cobra.NoArgs,
PreRun: func(cmd *cobra.Command, _ []string) {
_ = viper.BindPFlag(endpointFlag, cmd.Flags().Lookup(endpointFlag))
},
Expand Down Expand Up @@ -481,6 +481,7 @@ func init() {

RootCmd.AddCommand(netmapCandidatesCmd)
netmapCandidatesCmd.Flags().StringP(endpointFlag, "r", "", "N3 RPC node endpoint")
netmapCandidatesCmd.Flags().BoolP(nodeV2Flag, "2", false, "Use node v2 data")

RootCmd.AddCommand(estimationsCmd)
ff := estimationsCmd.Flags()
Expand Down
4 changes: 2 additions & 2 deletions cmd/neofs-node/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ func newCachedNetmapStorage(s netmap.State, v netmap.Source) *lruNetmapSource {
}
}

func (s *lruNetmapSource) GetNetMap(diff uint64) (*netmapSDK.NetMap, error) {
return s.getNetMapByEpoch(s.netState.CurrentEpoch() - diff)
func (s *lruNetmapSource) NetMap() (*netmapSDK.NetMap, error) {
return s.getNetMapByEpoch(s.netState.CurrentEpoch())
}

func (s *lruNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmapSDK.NetMap, error) {
Expand Down
57 changes: 28 additions & 29 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/fs"
"math/big"
"net"
"os"
"os/signal"
Expand All @@ -16,6 +17,7 @@ import (

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
netmaprpc "github.com/nspcc-dev/neofs-contract/rpc/netmap"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient"
contractsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/contracts"
Expand Down Expand Up @@ -820,42 +822,39 @@ func (c *cfg) handleLocalNodeInfoFromNetwork(ni *netmap.NodeInfo) {
c.localNodeInNetmap.Store(ni != nil)
}

// bootstrapWithState calls "addPeer" method of FS chain Netmap contract
// with the binary-encoded information from the current node's configuration.
// The state is set using the provided setter which MUST NOT be nil.
func (c *cfg) bootstrapWithState(stateSetter func(*netmap.NodeInfo)) error {
// bootstrapOnline puts node into the map with online state.
func (c *cfg) bootstrapOnline() error {
c.log.Info("bootstrapping with online state")
c.cfgNodeInfo.localInfoLock.RLock()
ni := c.cfgNodeInfo.localInfo
c.cfgNodeInfo.localInfoLock.RUnlock()
stateSetter(&ni)
ni.SetOnline()

prm := nmClient.AddPeerPrm{}
prm.SetNodeInfo(ni)

return c.cfgNetmap.wrapper.AddPeer(prm)
}

// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
func bootstrapOnline(c *cfg) error {
return c.bootstrapWithState((*netmap.NodeInfo).SetOnline)
return c.cfgNetmap.wrapper.AddPeer(ni, c.key.PublicKey())
}

// bootstrap calls bootstrapWithState with:
// - "maintenance" state if maintenance is in progress on the current node
// - "online", otherwise
func (c *cfg) bootstrap() error {
// switch to online except when under maintenance
st := c.cfgNetmap.state.controlNetmapStatus()
if st == control.NetmapStatus_MAINTENANCE {
c.log.Info("bootstrapping with the maintenance state")
return c.bootstrapWithState((*netmap.NodeInfo).SetMaintenance)
}

c.log.Info("bootstrapping with online state",
zap.Stringer("previous", st),
// heartbeat sends AddNode and/or UpdatePeer transactions with the current
// node state which can be "maintenance" or "online".
func (c *cfg) heartbeat() error {
var (
currentStatus = c.cfgNetmap.state.controlNetmapStatus()
st *big.Int
)

return bootstrapOnline(c)
if !c.cfgNetmap.wrapper.IsNodeV2() {
err := c.bootstrapOnline()
if err != nil {
return err
}
}
switch currentStatus {
case control.NetmapStatus_MAINTENANCE:
st = netmaprpc.NodeStateMaintenance
case control.NetmapStatus_ONLINE:
st = netmaprpc.NodeStateOnline
default:
return fmt.Errorf("incorrect current network map status %v, restart recommended", currentStatus)
}
return c.updateNetMapState(st)
}

// needBootstrap checks if local node should be registered in network on bootup.
Expand Down
4 changes: 2 additions & 2 deletions cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ func (c *cfg) restartMorph() error {
return nil
}

err = c.bootstrap()
err = c.heartbeat()
if err != nil {
c.log.Warn("failed to re-bootstrap", zap.Error(err))
c.log.Warn("failed to send heartbeat", zap.Error(err))
}

c.log.Info("internal services have been restarted after RPC connection loss")
Expand Down
78 changes: 43 additions & 35 deletions cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"bytes"
"errors"
"fmt"
"math/big"
"sync/atomic"

netmaprpc "github.com/nspcc-dev/neofs-contract/rpc/netmap"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
Expand Down Expand Up @@ -183,15 +184,9 @@ func initNetmapService(c *cfg) {
return
}

n := ev.(netmapEvent.NewEpoch).EpochNumber()

const reBootstrapInterval = 2

if (n-c.cfgNetmap.startEpoch)%reBootstrapInterval == 0 {
err := c.bootstrap()
if err != nil {
c.log.Warn("can't send re-bootstrap tx", zap.Error(err))
}
err := c.heartbeat()
if err != nil {
c.log.Warn("can't send heartbeat tx", zap.Error(err))
}
})

Expand Down Expand Up @@ -225,8 +220,19 @@ func initNetmapService(c *cfg) {
// Must be called after initNetmapService.
func bootstrapNode(c *cfg) {
if c.needBootstrap() {
err := c.bootstrap()
fatalOnErrDetails("bootstrap error", err)
if c.cfgNetmap.state.controlNetmapStatus() == control.NetmapStatus_OFFLINE {
c.log.Info("current state is offline")
err := c.bootstrapOnline()
fatalOnErrDetails("bootstrap error", err)
} else {
c.log.Info("network map contains this node, sending heartbeat")
err := c.heartbeat()
if err != nil {
// Not as critical as the one above, will be
// updated the next epoch.
c.log.Warn("heartbeat error", zap.Error(err))
}
}
}
}

Expand Down Expand Up @@ -335,28 +341,34 @@ func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) {
var errRelayBootstrap = errors.New("setting netmap status is forbidden in relay mode")

func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
switch st {
default:
return fmt.Errorf("unsupported status %v", st)
case control.NetmapStatus_MAINTENANCE:
return c.setMaintenanceStatus(false)
case control.NetmapStatus_ONLINE, control.NetmapStatus_OFFLINE:
}

c.stopMaintenance()

if !c.needBootstrap() {
return errRelayBootstrap
}

if st == control.NetmapStatus_ONLINE {
c.cfgNetmap.reBoostrapTurnedOff.Store(false)
return bootstrapOnline(c)
var currentStatus = c.cfgNetmap.state.controlNetmapStatus()

if currentStatus == st {
return nil // no-op
}

c.cfgNetmap.reBoostrapTurnedOff.Store(true)
if currentStatus == control.NetmapStatus_OFFLINE {
if st != control.NetmapStatus_ONLINE {
return errors.New("can't add non-online node to map")
}
return c.bootstrapOnline()
}

return c.updateNetMapState(func(*nmClient.UpdatePeerPrm) {})
switch st {
case control.NetmapStatus_OFFLINE:
return c.updateNetMapState(nil)
case control.NetmapStatus_MAINTENANCE:
return c.setMaintenanceStatus(false)
case control.NetmapStatus_ONLINE:
c.stopMaintenance()
return c.updateNetMapState(netmaprpc.NodeStateOnline)
default:
return fmt.Errorf("unsupported status %v", st)
}
}

func (c *cfg) ForceMaintenance() error {
Expand All @@ -375,7 +387,7 @@ func (c *cfg) setMaintenanceStatus(force bool) error {
c.startMaintenance()

if err == nil {
err = c.updateNetMapState((*nmClient.UpdatePeerPrm).SetMaintenance)
err = c.updateNetMapState(netmaprpc.NodeStateMaintenance)
}

if err != nil {
Expand All @@ -388,12 +400,8 @@ func (c *cfg) setMaintenanceStatus(force bool) error {

// calls UpdatePeerState operation of Netmap contract's client for the local node.
// State setter is used to specify node state to switch to.
func (c *cfg) updateNetMapState(stateSetter func(*nmClient.UpdatePeerPrm)) error {
var prm nmClient.UpdatePeerPrm
prm.SetKey(c.key.PublicKey().Bytes())
stateSetter(&prm)

return c.cfgNetmap.wrapper.UpdatePeerState(prm)
func (c *cfg) updateNetMapState(state *big.Int) error {
return c.cfgNetmap.wrapper.UpdatePeerState(c.key.PublicKey().Bytes(), state)
}

func (c *cfg) GetNetworkInfo() (netmapSDK.NetworkInfo, error) {
Expand Down Expand Up @@ -464,5 +472,5 @@ func (c *cfg) reloadNodeAttributes() error {
return nil
}

return c.bootstrap()
return c.bootstrapOnline()
}
Loading

0 comments on commit 3e0d534

Please sign in to comment.