Skip to content

Commit

Permalink
network: hybrid networking (#5800)
Browse files Browse the repository at this point in the history
  • Loading branch information
cce authored Nov 14, 2023
1 parent bcf71c1 commit b6f8a69
Show file tree
Hide file tree
Showing 29 changed files with 508 additions and 108 deletions.
19 changes: 13 additions & 6 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ type NetworkFacade struct {
rand *rand.Rand
timeoutAtInitOnce sync.Once
timeoutAtInitWait sync.WaitGroup
peerToNode map[network.Peer]int
peerToNode map[*facadePeer]int
}

type facadePeer struct {
id int
net network.GossipNode
}

func (p *facadePeer) GetNetwork() network.GossipNode { return p.net }

// MakeNetworkFacade creates a facade with a given nodeID.
func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
n := &NetworkFacade{
Expand All @@ -83,12 +90,12 @@ func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
eventsQueues: make(map[string]int),
eventsQueuesCh: make(chan int, 1000),
rand: rand.New(rand.NewSource(int64(nodeID))),
peerToNode: make(map[network.Peer]int, fuzzer.nodesCount),
peerToNode: make(map[*facadePeer]int, fuzzer.nodesCount),
debugMessages: false,
}
n.timeoutAtInitWait.Add(1)
for i := 0; i < fuzzer.nodesCount; i++ {
n.peerToNode[network.Peer(new(int))] = i
n.peerToNode[&facadePeer{id: i, net: n}] = i
}
return n
}
Expand Down Expand Up @@ -179,7 +186,7 @@ func (n *NetworkFacade) WaitForEventsQueue(cleared bool) {
func (n *NetworkFacade) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, exclude network.Peer) error {
excludeNode := -1
if exclude != nil {
excludeNode = n.peerToNode[exclude]
excludeNode = n.peerToNode[exclude.(*facadePeer)]
}
return n.broadcast(tag, data, excludeNode, "NetworkFacade service-%v Broadcast %v %v\n")
}
Expand Down Expand Up @@ -341,8 +348,8 @@ func (n *NetworkFacade) ReceiveMessage(sourceNode int, tag protocol.Tag, data []
n.pushPendingReceivedMessage()
}

func (n *NetworkFacade) Disconnect(sender network.Peer) {
sourceNode := n.peerToNode[sender]
func (n *NetworkFacade) Disconnect(sender network.DisconnectablePeer) {
sourceNode := n.peerToNode[sender.(*facadePeer)]
n.fuzzer.Disconnect(n.nodeID, sourceNode)
}

Expand Down
2 changes: 1 addition & 1 deletion agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (w *whiteholeNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b
func (w *whiteholeNetwork) BroadcastSimple(tag protocol.Tag, data []byte) error {
return w.Broadcast(context.Background(), tag, data, true, nil)
}
func (w *whiteholeNetwork) Disconnect(badnode network.Peer) {
func (w *whiteholeNetwork) Disconnect(badnode network.DisconnectablePeer) {
return
}
func (w *whiteholeNetwork) DisconnectPeers() {
Expand Down
11 changes: 6 additions & 5 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net"
"net/http"
"net/url"
"strings"
"testing"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -174,8 +173,8 @@ func (b *basicRPCNode) GetPeers(options ...network.PeerOption) []network.Peer {
return b.peers
}

func (b *basicRPCNode) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1)
func (b *basicRPCNode) GetGenesisID() string {
return "test genesisID"
}

type httpTestPeerSource struct {
Expand All @@ -192,8 +191,8 @@ func (s *httpTestPeerSource) RegisterHandlers(dispatch []network.TaggedMessageHa
s.dispatchHandlers = append(s.dispatchHandlers, dispatch...)
}

func (s *httpTestPeerSource) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1)
func (s *httpTestPeerSource) GetGenesisID() string {
return "test genesisID"
}

// implement network.HTTPPeer
Expand Down Expand Up @@ -239,6 +238,8 @@ func (p *testUnicastPeer) GetAddress() string {
return "test"
}

func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn }

func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) {

responseChannel := make(chan *network.Response, 1)
Expand Down
2 changes: 1 addition & 1 deletion catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (lf *ledgerFetcher) requestLedger(ctx context.Context, peer network.HTTPPee
return nil, err
}

parsedURL.Path = lf.net.SubstituteGenesisID(path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36)))
parsedURL.Path = network.SubstituteGenesisID(lf.net, path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36)))
ledgerURL := parsedURL.String()
lf.log.Debugf("ledger %s %#v peer %#v %T", method, ledgerURL, peer, peer)
request, err := http.NewRequestWithContext(ctx, method, ledgerURL, nil)
Expand Down
14 changes: 9 additions & 5 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// MockNetwork is a dummy network that doesn't do anything
type MockNetwork struct {
network.GossipNode
GenesisID string
}

// Broadcast - unused function
Expand Down Expand Up @@ -58,7 +59,7 @@ func (network *MockNetwork) RequestConnectOutgoing(replace bool, quit <-chan str
}

// Disconnect - unused function
func (network *MockNetwork) Disconnect(badpeer network.Peer) {
func (network *MockNetwork) Disconnect(badpeer network.DisconnectablePeer) {
}

// DisconnectPeers - unused function
Expand All @@ -75,7 +76,7 @@ func (network *MockNetwork) GetPeers(options ...network.PeerOption) []network.Pe
}

// GetRoundTripper -- returns the network round tripper
func (network *MockNetwork) GetRoundTripper() http.RoundTripper {
func (network *MockNetwork) GetRoundTripper(peer network.Peer) http.RoundTripper {
return http.DefaultTransport
}

Expand Down Expand Up @@ -106,7 +107,10 @@ func (network *MockNetwork) GetHTTPRequestConnection(request *http.Request) (con
return nil
}

// SubstituteGenesisID - empty implementation
func (network *MockNetwork) SubstituteGenesisID(rawURL string) string {
return rawURL
// GetGenesisID - empty implementation
func (network *MockNetwork) GetGenesisID() string {
if network.GenesisID == "" {
return "mocknet"
}
return network.GenesisID
}
6 changes: 6 additions & 0 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,12 @@ type Local struct {
// EnableP2P turns on the peer to peer network
EnableP2P bool `version[31]:"false"`

// EnableP2PHybridMode turns on both websockets and P2P networking.
EnableP2PHybridMode bool `version[31]:"false"`

// P2PListenAddress sets the listen address used for P2P networking, if hybrid mode is set.
P2PListenAddress string `version[31]:""`

// P2PPersistPeerID will write the private key used for the node's PeerID to the P2PPrivateKeyLocation.
// This is only used when P2PEnable is true. If P2PPrivateKey is not specified, it uses the default location.
P2PPersistPeerID bool `version[29]:"false"`
Expand Down
2 changes: 2 additions & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var defaultLocal = Local{
EnableMetricReporting: false,
EnableOutgoingNetworkMessageFiltering: true,
EnableP2P: false,
EnableP2PHybridMode: false,
EnablePingHandler: true,
EnableProcessBlockStats: false,
EnableProfiler: false,
Expand Down Expand Up @@ -117,6 +118,7 @@ var defaultLocal = Local{
OptimizeAccountsDatabaseOnStartup: false,
OutgoingMessageFilterBucketCount: 3,
OutgoingMessageFilterBucketSize: 128,
P2PListenAddress: "",
P2PPersistPeerID: false,
P2PPrivateKeyLocation: "",
ParticipationKeysRefreshInterval: 60000000000,
Expand Down
3 changes: 2 additions & 1 deletion data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ var txBacklogSize = config.GetDefaultLocal().TxBacklogSize
// mock sender is used to implement OnClose, since TXHandlers expect to use Senders and ERL Clients
type mockSender struct{}

func (m mockSender) OnClose(func()) {}
func (m mockSender) OnClose(func()) {}
func (m mockSender) GetNetwork() network.GossipNode { panic("not implemented") }

// txHandlerConfig is a subset of tx handler related options from config.Local
type txHandlerConfig struct {
Expand Down
2 changes: 2 additions & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"EnableMetricReporting": false,
"EnableOutgoingNetworkMessageFiltering": true,
"EnableP2P": false,
"EnableP2PHybridMode": false,
"EnablePingHandler": true,
"EnableProcessBlockStats": false,
"EnableProfiler": false,
Expand Down Expand Up @@ -96,6 +97,7 @@
"OptimizeAccountsDatabaseOnStartup": false,
"OutgoingMessageFilterBucketCount": 3,
"OutgoingMessageFilterBucketSize": 128,
"P2PListenAddress": "",
"P2PPersistPeerID": false,
"P2PPrivateKeyLocation": "",
"ParticipationKeysRefreshInterval": 60000000000,
Expand Down
2 changes: 1 addition & 1 deletion network/connPerfMon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) {

addMsg := func(msgCount int) {
for i := 0; i < msgCount; i++ {
msg.Sender = peers[(int(msgIndex)+i)%len(peers)]
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer)
timer += int64(7 * time.Nanosecond)
msg.Received = timer
out = append(out, msg)
Expand Down
21 changes: 16 additions & 5 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"net"
"net/http"
"strings"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/protocol"
Expand All @@ -28,6 +29,11 @@ import (
// Peer opaque interface for referring to a neighbor in the network
type Peer interface{}

// DisconnectablePeer is a Peer with a long-living connection to a network that can be disconnected
type DisconnectablePeer interface {
GetNetwork() GossipNode
}

// PeerOption allows users to specify a subset of peers to query
//
//msgp:ignore PeerOption
Expand All @@ -51,7 +57,7 @@ type GossipNode interface {
Address() (string, bool)
Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error
Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error
Disconnect(badnode Peer)
Disconnect(badnode DisconnectablePeer)
DisconnectPeers() // only used by testing

// RegisterHTTPHandler path accepts gorilla/mux path annotations
Expand All @@ -78,7 +84,7 @@ type GossipNode interface {
ClearHandlers()

// GetRoundTripper returns a Transport that would limit the number of outgoing connections.
GetRoundTripper() http.RoundTripper
GetRoundTripper(peer Peer) http.RoundTripper

// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress.
// this is the only indication that we have that we haven't formed a clique, where all incoming messages
Expand All @@ -90,8 +96,8 @@ type GossipNode interface {
// request that was provided to the http handler ( or provide a fallback Context() to that )
GetHTTPRequestConnection(request *http.Request) (conn net.Conn)

// SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID.
SubstituteGenesisID(rawURL string) string
// GetGenesisID returns the network-specific genesisID.
GetGenesisID() string

// called from wsPeer to report that it has closed
peerRemoteClose(peer *wsPeer, reason disconnectReason)
Expand All @@ -109,7 +115,7 @@ var outgoingMessagesBufferSize = int(

// IncomingMessage represents a message arriving from some peer in our p2p network
type IncomingMessage struct {
Sender Peer
Sender DisconnectablePeer
Tag Tag
Data []byte
Err error
Expand Down Expand Up @@ -207,3 +213,8 @@ func max(numbers ...uint64) (maxNum uint64) {
}
return
}

// SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID.
func SubstituteGenesisID(net GossipNode, rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", net.GetGenesisID(), -1)
}
Loading

0 comments on commit b6f8a69

Please sign in to comment.