diff --git a/agreement/fuzzer/networkFacade_test.go b/agreement/fuzzer/networkFacade_test.go index d18f8251dd..35eba4a273 100644 --- a/agreement/fuzzer/networkFacade_test.go +++ b/agreement/fuzzer/networkFacade_test.go @@ -70,9 +70,16 @@ type NetworkFacade struct { rand *rand.Rand timeoutAtInitOnce sync.Once timeoutAtInitWait sync.WaitGroup - peerToNode map[network.Peer]int + peerToNode map[*facadePeer]int } +type facadePeer struct { + id int + net network.GossipNode +} + +func (p *facadePeer) GetNetwork() network.GossipNode { return p.net } + // MakeNetworkFacade creates a facade with a given nodeID. func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade { n := &NetworkFacade{ @@ -83,12 +90,12 @@ func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade { eventsQueues: make(map[string]int), eventsQueuesCh: make(chan int, 1000), rand: rand.New(rand.NewSource(int64(nodeID))), - peerToNode: make(map[network.Peer]int, fuzzer.nodesCount), + peerToNode: make(map[*facadePeer]int, fuzzer.nodesCount), debugMessages: false, } n.timeoutAtInitWait.Add(1) for i := 0; i < fuzzer.nodesCount; i++ { - n.peerToNode[network.Peer(new(int))] = i + n.peerToNode[&facadePeer{id: i, net: n}] = i } return n } @@ -179,7 +186,7 @@ func (n *NetworkFacade) WaitForEventsQueue(cleared bool) { func (n *NetworkFacade) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, exclude network.Peer) error { excludeNode := -1 if exclude != nil { - excludeNode = n.peerToNode[exclude] + excludeNode = n.peerToNode[exclude.(*facadePeer)] } return n.broadcast(tag, data, excludeNode, "NetworkFacade service-%v Broadcast %v %v\n") } @@ -341,8 +348,8 @@ func (n *NetworkFacade) ReceiveMessage(sourceNode int, tag protocol.Tag, data [] n.pushPendingReceivedMessage() } -func (n *NetworkFacade) Disconnect(sender network.Peer) { - sourceNode := n.peerToNode[sender] +func (n *NetworkFacade) Disconnect(sender network.DisconnectablePeer) { + sourceNode := n.peerToNode[sender.(*facadePeer)] n.fuzzer.Disconnect(n.nodeID, sourceNode) } diff --git a/agreement/gossip/network_test.go b/agreement/gossip/network_test.go index 8584dc8d26..3607afb168 100644 --- a/agreement/gossip/network_test.go +++ b/agreement/gossip/network_test.go @@ -136,7 +136,7 @@ func (w *whiteholeNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b func (w *whiteholeNetwork) BroadcastSimple(tag protocol.Tag, data []byte) error { return w.Broadcast(context.Background(), tag, data, true, nil) } -func (w *whiteholeNetwork) Disconnect(badnode network.Peer) { +func (w *whiteholeNetwork) Disconnect(badnode network.DisconnectablePeer) { return } func (w *whiteholeNetwork) DisconnectPeers() { diff --git a/catchup/fetcher_test.go b/catchup/fetcher_test.go index 983de01475..02224d7bdd 100644 --- a/catchup/fetcher_test.go +++ b/catchup/fetcher_test.go @@ -21,7 +21,6 @@ import ( "net" "net/http" "net/url" - "strings" "testing" "github.com/gorilla/mux" @@ -174,8 +173,8 @@ func (b *basicRPCNode) GetPeers(options ...network.PeerOption) []network.Peer { return b.peers } -func (b *basicRPCNode) SubstituteGenesisID(rawURL string) string { - return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1) +func (b *basicRPCNode) GetGenesisID() string { + return "test genesisID" } type httpTestPeerSource struct { @@ -192,8 +191,8 @@ func (s *httpTestPeerSource) RegisterHandlers(dispatch []network.TaggedMessageHa s.dispatchHandlers = append(s.dispatchHandlers, dispatch...) } -func (s *httpTestPeerSource) SubstituteGenesisID(rawURL string) string { - return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1) +func (s *httpTestPeerSource) GetGenesisID() string { + return "test genesisID" } // implement network.HTTPPeer @@ -239,6 +238,8 @@ func (p *testUnicastPeer) GetAddress() string { return "test" } +func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn } + func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) { responseChannel := make(chan *network.Response, 1) diff --git a/catchup/ledgerFetcher.go b/catchup/ledgerFetcher.go index 43a039a09e..971aeb5eb5 100644 --- a/catchup/ledgerFetcher.go +++ b/catchup/ledgerFetcher.go @@ -79,7 +79,7 @@ func (lf *ledgerFetcher) requestLedger(ctx context.Context, peer network.HTTPPee return nil, err } - parsedURL.Path = lf.net.SubstituteGenesisID(path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36))) + parsedURL.Path = network.SubstituteGenesisID(lf.net, path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36))) ledgerURL := parsedURL.String() lf.log.Debugf("ledger %s %#v peer %#v %T", method, ledgerURL, peer, peer) request, err := http.NewRequestWithContext(ctx, method, ledgerURL, nil) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index 8c8eb113f1..e1ceb63142 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -28,6 +28,7 @@ import ( // MockNetwork is a dummy network that doesn't do anything type MockNetwork struct { network.GossipNode + GenesisID string } // Broadcast - unused function @@ -58,7 +59,7 @@ func (network *MockNetwork) RequestConnectOutgoing(replace bool, quit <-chan str } // Disconnect - unused function -func (network *MockNetwork) Disconnect(badpeer network.Peer) { +func (network *MockNetwork) Disconnect(badpeer network.DisconnectablePeer) { } // DisconnectPeers - unused function @@ -75,7 +76,7 @@ func (network *MockNetwork) GetPeers(options ...network.PeerOption) []network.Pe } // GetRoundTripper -- returns the network round tripper -func (network *MockNetwork) GetRoundTripper() http.RoundTripper { +func (network *MockNetwork) GetRoundTripper(peer network.Peer) http.RoundTripper { return http.DefaultTransport } @@ -106,7 +107,10 @@ func (network *MockNetwork) GetHTTPRequestConnection(request *http.Request) (con return nil } -// SubstituteGenesisID - empty implementation -func (network *MockNetwork) SubstituteGenesisID(rawURL string) string { - return rawURL +// GetGenesisID - empty implementation +func (network *MockNetwork) GetGenesisID() string { + if network.GenesisID == "" { + return "mocknet" + } + return network.GenesisID } diff --git a/config/localTemplate.go b/config/localTemplate.go index 9c547720e0..694a926015 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -585,6 +585,12 @@ type Local struct { // EnableP2P turns on the peer to peer network EnableP2P bool `version[31]:"false"` + // EnableP2PHybridMode turns on both websockets and P2P networking. + EnableP2PHybridMode bool `version[31]:"false"` + + // P2PListenAddress sets the listen address used for P2P networking, if hybrid mode is set. + P2PListenAddress string `version[31]:""` + // P2PPersistPeerID will write the private key used for the node's PeerID to the P2PPrivateKeyLocation. // This is only used when P2PEnable is true. If P2PPrivateKey is not specified, it uses the default location. P2PPersistPeerID bool `version[29]:"false"` diff --git a/config/local_defaults.go b/config/local_defaults.go index 2aa1ba3be9..b1c1f23784 100644 --- a/config/local_defaults.go +++ b/config/local_defaults.go @@ -76,6 +76,7 @@ var defaultLocal = Local{ EnableMetricReporting: false, EnableOutgoingNetworkMessageFiltering: true, EnableP2P: false, + EnableP2PHybridMode: false, EnablePingHandler: true, EnableProcessBlockStats: false, EnableProfiler: false, @@ -117,6 +118,7 @@ var defaultLocal = Local{ OptimizeAccountsDatabaseOnStartup: false, OutgoingMessageFilterBucketCount: 3, OutgoingMessageFilterBucketSize: 128, + P2PListenAddress: "", P2PPersistPeerID: false, P2PPrivateKeyLocation: "", ParticipationKeysRefreshInterval: 60000000000, diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 486bd7c0f8..ea622ca68b 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -60,7 +60,8 @@ var txBacklogSize = config.GetDefaultLocal().TxBacklogSize // mock sender is used to implement OnClose, since TXHandlers expect to use Senders and ERL Clients type mockSender struct{} -func (m mockSender) OnClose(func()) {} +func (m mockSender) OnClose(func()) {} +func (m mockSender) GetNetwork() network.GossipNode { panic("not implemented") } // txHandlerConfig is a subset of tx handler related options from config.Local type txHandlerConfig struct { diff --git a/installer/config.json.example b/installer/config.json.example index b4de6dd5df..1179a5b629 100644 --- a/installer/config.json.example +++ b/installer/config.json.example @@ -55,6 +55,7 @@ "EnableMetricReporting": false, "EnableOutgoingNetworkMessageFiltering": true, "EnableP2P": false, + "EnableP2PHybridMode": false, "EnablePingHandler": true, "EnableProcessBlockStats": false, "EnableProfiler": false, @@ -96,6 +97,7 @@ "OptimizeAccountsDatabaseOnStartup": false, "OutgoingMessageFilterBucketCount": 3, "OutgoingMessageFilterBucketSize": 128, + "P2PListenAddress": "", "P2PPersistPeerID": false, "P2PPrivateKeyLocation": "", "ParticipationKeysRefreshInterval": 60000000000, diff --git a/network/connPerfMon_test.go b/network/connPerfMon_test.go index a8398b0f36..ae9038c5ce 100644 --- a/network/connPerfMon_test.go +++ b/network/connPerfMon_test.go @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) { addMsg := func(msgCount int) { for i := 0; i < msgCount; i++ { - msg.Sender = peers[(int(msgIndex)+i)%len(peers)] + msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer) timer += int64(7 * time.Nanosecond) msg.Received = timer out = append(out, msg) diff --git a/network/gossipNode.go b/network/gossipNode.go index 7ae667170a..8d1b877449 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -20,6 +20,7 @@ import ( "context" "net" "net/http" + "strings" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/protocol" @@ -28,6 +29,11 @@ import ( // Peer opaque interface for referring to a neighbor in the network type Peer interface{} +// DisconnectablePeer is a Peer with a long-living connection to a network that can be disconnected +type DisconnectablePeer interface { + GetNetwork() GossipNode +} + // PeerOption allows users to specify a subset of peers to query // //msgp:ignore PeerOption @@ -51,7 +57,7 @@ type GossipNode interface { Address() (string, bool) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - Disconnect(badnode Peer) + Disconnect(badnode DisconnectablePeer) DisconnectPeers() // only used by testing // RegisterHTTPHandler path accepts gorilla/mux path annotations @@ -78,7 +84,7 @@ type GossipNode interface { ClearHandlers() // GetRoundTripper returns a Transport that would limit the number of outgoing connections. - GetRoundTripper() http.RoundTripper + GetRoundTripper(peer Peer) http.RoundTripper // OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress. // this is the only indication that we have that we haven't formed a clique, where all incoming messages @@ -90,8 +96,8 @@ type GossipNode interface { // request that was provided to the http handler ( or provide a fallback Context() to that ) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) - // SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID. - SubstituteGenesisID(rawURL string) string + // GetGenesisID returns the network-specific genesisID. + GetGenesisID() string // called from wsPeer to report that it has closed peerRemoteClose(peer *wsPeer, reason disconnectReason) @@ -109,7 +115,7 @@ var outgoingMessagesBufferSize = int( // IncomingMessage represents a message arriving from some peer in our p2p network type IncomingMessage struct { - Sender Peer + Sender DisconnectablePeer Tag Tag Data []byte Err error @@ -207,3 +213,8 @@ func max(numbers ...uint64) (maxNum uint64) { } return } + +// SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID. +func SubstituteGenesisID(net GossipNode, rawURL string) string { + return strings.Replace(rawURL, "{genesisID}", net.GetGenesisID(), -1) +} diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go new file mode 100644 index 0000000000..c09c5d4a1a --- /dev/null +++ b/network/hybridNetwork.go @@ -0,0 +1,221 @@ +// Copyright (C) 2019-2023 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "context" + "fmt" + "net" + "net/http" + "sync" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/protocol" +) + +// HybridP2PNetwork runs both P2PNetwork and WebsocketNetwork to implement the GossipNode interface +type HybridP2PNetwork struct { + p2pNetwork *P2PNetwork + wsNetwork *WebsocketNetwork + genesisID string + + useP2PAddress bool +} + +// NewHybridP2PNetwork constructs a GossipNode that combines P2PNetwork and WebsocketNetwork +func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo) (*HybridP2PNetwork, error) { + // supply alternate NetAddress for P2P network + p2pcfg := cfg + p2pcfg.NetAddress = cfg.P2PListenAddress + p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID) + if err != nil { + return nil, err + } + wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisID, networkID, nodeInfo, p2pnet.PeerID(), p2pnet.PeerIDSigner()) + if err != nil { + return nil, err + } + return &HybridP2PNetwork{ + p2pNetwork: p2pnet, + wsNetwork: wsnet, + genesisID: genesisID, + }, nil +} + +// Address implements GossipNode +func (n *HybridP2PNetwork) Address() (string, bool) { + // TODO map from configuration? used for REST API, goal status, algod.net, etc + if n.useP2PAddress { + return n.p2pNetwork.Address() + } + return n.wsNetwork.Address() +} + +type hybridNetworkError struct{ p2pErr, wsErr error } + +func (e *hybridNetworkError) Error() string { + return fmt.Sprintf("p2pErr: %s, wsErr: %s", e.p2pErr, e.wsErr) +} +func (e *hybridNetworkError) Unwrap() []error { return []error{e.p2pErr, e.wsErr} } + +func (n *HybridP2PNetwork) runParallel(fn func(net GossipNode) error) error { + var wg sync.WaitGroup + var p2pErr, wsErr error + + wg.Add(2) + go func() { + defer wg.Done() + p2pErr = fn(n.p2pNetwork) + }() + go func() { + defer wg.Done() + wsErr = fn(n.wsNetwork) + }() + wg.Wait() + + if p2pErr != nil && wsErr != nil { + return &hybridNetworkError{p2pErr, wsErr} + } + if p2pErr != nil { + return p2pErr + } + if wsErr != nil { + return wsErr + } + return nil +} + +// Broadcast implements GossipNode +func (n *HybridP2PNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error { + return n.runParallel(func(net GossipNode) error { + return net.Broadcast(ctx, tag, data, wait, except) + }) +} + +// Relay implements GossipNode +func (n *HybridP2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error { + return n.runParallel(func(net GossipNode) error { + return net.Relay(ctx, tag, data, wait, except) + }) +} + +// Disconnect implements GossipNode +func (n *HybridP2PNetwork) Disconnect(badnode DisconnectablePeer) { + net := badnode.GetNetwork() + if net == n.p2pNetwork { + n.p2pNetwork.Disconnect(badnode) + } else if net == n.wsNetwork { + n.wsNetwork.Disconnect(badnode) + } else { + panic("badnode.GetNetwork() returned a network that is not part of this HybridP2PNetwork") + } +} + +// DisconnectPeers implements GossipNode +func (n *HybridP2PNetwork) DisconnectPeers() { + _ = n.runParallel(func(net GossipNode) error { + net.DisconnectPeers() + return nil + }) +} + +// RegisterHTTPHandler implements GossipNode +func (n *HybridP2PNetwork) RegisterHTTPHandler(path string, handler http.Handler) { + n.p2pNetwork.RegisterHTTPHandler(path, handler) + n.wsNetwork.RegisterHTTPHandler(path, handler) +} + +// RequestConnectOutgoing implements GossipNode +func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {} + +// GetPeers implements GossipNode +func (n *HybridP2PNetwork) GetPeers(options ...PeerOption) []Peer { + // TODO better way of combining data from peerstore and returning in GetPeers + var peers []Peer + peers = append(peers, n.p2pNetwork.GetPeers(options...)...) + peers = append(peers, n.wsNetwork.GetPeers(options...)...) + return peers +} + +// Start implements GossipNode +func (n *HybridP2PNetwork) Start() { + _ = n.runParallel(func(net GossipNode) error { + net.Start() + return nil + }) +} + +// Stop implements GossipNode +func (n *HybridP2PNetwork) Stop() { + _ = n.runParallel(func(net GossipNode) error { + net.Start() + return nil + }) +} + +// RegisterHandlers adds to the set of given message handlers. +func (n *HybridP2PNetwork) RegisterHandlers(dispatch []TaggedMessageHandler) { + n.p2pNetwork.RegisterHandlers(dispatch) + n.wsNetwork.RegisterHandlers(dispatch) +} + +// ClearHandlers deregisters all the existing message handlers. +func (n *HybridP2PNetwork) ClearHandlers() { + n.p2pNetwork.ClearHandlers() + n.wsNetwork.ClearHandlers() +} + +// GetRoundTripper returns a Transport that would limit the number of outgoing connections. +func (n *HybridP2PNetwork) GetRoundTripper(peer Peer) http.RoundTripper { + // TODO today this is used by HTTPTxSync.Sync after calling GetPeers(network.PeersPhonebookRelays) + switch p := peer.(type) { + case *wsPeer: + return p.net.GetRoundTripper(peer) + case gossipSubPeer: + return p.net.GetRoundTripper(peer) + default: + panic("unrecognized peer type") + } +} + +// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress. +// this is the only indication that we have that we haven't formed a clique, where all incoming messages +// arrive very quickly, but might be missing some votes. The usage of this call is expected to have similar +// characteristics as with a watchdog timer. +func (n *HybridP2PNetwork) OnNetworkAdvance() { + _ = n.runParallel(func(net GossipNode) error { + net.OnNetworkAdvance() + return nil + }) +} + +// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same +// request that was provided to the http handler ( or provide a fallback Context() to that ) +func (n *HybridP2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) { + return nil +} + +// GetGenesisID returns the network-specific genesisID. +func (n *HybridP2PNetwork) GetGenesisID() string { + return n.genesisID +} + +// called from wsPeer to report that it has closed +func (n *HybridP2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) { + panic("wsPeer should only call WebsocketNetwork.peerRemoteClose or P2PNetwork.peerRemoteClose") +} diff --git a/network/netidentity.go b/network/netidentity.go index 940ea0a633..e7bee84835 100644 --- a/network/netidentity.go +++ b/network/netidentity.go @@ -94,12 +94,39 @@ type identityChallengeScheme interface { VerifyResponse(h http.Header, c identityChallengeValue) (crypto.PublicKey, []byte, error) } +type identityChallengeSigner interface { + Sign(message crypto.Hashable) crypto.Signature + SignBytes(message []byte) crypto.Signature + Verify(message crypto.Hashable, sig crypto.Signature) bool + PublicKey() crypto.PublicKey +} + +type identityChallengeLegacySigner struct { + keys *crypto.SignatureSecrets +} + +func (s *identityChallengeLegacySigner) Sign(message crypto.Hashable) crypto.Signature { + return s.keys.Sign(message) +} + +func (s *identityChallengeLegacySigner) SignBytes(message []byte) crypto.Signature { + return s.keys.SignBytes(message) +} + +func (s *identityChallengeLegacySigner) Verify(message crypto.Hashable, sig crypto.Signature) bool { + return s.keys.SignatureVerifier.Verify(message, sig) +} + +func (s *identityChallengeLegacySigner) PublicKey() crypto.PublicKey { + return s.keys.SignatureVerifier +} + // identityChallengePublicKeyScheme implements IdentityChallengeScheme by // exchanging and verifying public key challenges and attaching them to headers, // or returning the message payload to be sent type identityChallengePublicKeyScheme struct { dedupName string - identityKeys *crypto.SignatureSecrets + identityKeys identityChallengeSigner } // NewIdentityChallengeScheme will create a default Identification Scheme @@ -108,15 +135,21 @@ func NewIdentityChallengeScheme(dn string) *identityChallengePublicKeyScheme { if dn == "" { return &identityChallengePublicKeyScheme{} } + var seed crypto.Seed crypto.RandBytes(seed[:]) return &identityChallengePublicKeyScheme{ dedupName: dn, - identityKeys: crypto.GenerateSignatureSecrets(seed), + identityKeys: &identityChallengeLegacySigner{keys: crypto.GenerateSignatureSecrets(seed)}, } } +// NewIdentityChallengeSchemeWithSigner will create an identification Scheme with a given signer +func NewIdentityChallengeSchemeWithSigner(dn string, signer identityChallengeSigner) *identityChallengePublicKeyScheme { + return &identityChallengePublicKeyScheme{dedupName: dn, identityKeys: signer} +} + // AttachChallenge will generate a new identity challenge and will encode and attach the challenge // as a header. It returns the identityChallengeValue used for this challenge, so the network can // confirm it later (by passing it to VerifyResponse), or returns an empty challenge if dedupName is @@ -126,7 +159,7 @@ func (i identityChallengePublicKeyScheme) AttachChallenge(attachTo http.Header, return identityChallengeValue{} } c := identityChallenge{ - Key: i.identityKeys.SignatureVerifier, + Key: i.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), PublicAddress: []byte(addr), } @@ -173,7 +206,7 @@ func (i identityChallengePublicKeyScheme) VerifyRequestAndAttachResponse(attachT } // make the response object, encode it and attach it to the header r := identityChallengeResponse{ - Key: i.identityKeys.SignatureVerifier, + Key: i.identityKeys.PublicKey(), Challenge: idChal.Msg.Challenge, ResponseChallenge: newIdentityChallengeValue(), } @@ -271,12 +304,12 @@ type identityVerificationMessageSigned struct { Signature crypto.Signature `codec:"sig"` } -func (i identityChallenge) signAndEncodeB64(s *crypto.SignatureSecrets) string { +func (i identityChallenge) signAndEncodeB64(s identityChallengeSigner) string { signedChal := i.Sign(s) return base64.StdEncoding.EncodeToString(protocol.Encode(&signedChal)) } -func (i identityChallenge) Sign(secrets *crypto.SignatureSecrets) identityChallengeSigned { +func (i identityChallenge) Sign(secrets identityChallengeSigner) identityChallengeSigned { return identityChallengeSigned{Msg: i, Signature: secrets.Sign(i)} } @@ -289,12 +322,12 @@ func (i identityChallengeSigned) Verify() bool { return i.Msg.Key.Verify(i.Msg, i.Signature) } -func (i identityChallengeResponse) signAndEncodeB64(s *crypto.SignatureSecrets) string { +func (i identityChallengeResponse) signAndEncodeB64(s identityChallengeSigner) string { signedChalResp := i.Sign(s) return base64.StdEncoding.EncodeToString(protocol.Encode(&signedChalResp)) } -func (i identityChallengeResponse) Sign(secrets *crypto.SignatureSecrets) identityChallengeResponseSigned { +func (i identityChallengeResponse) Sign(secrets identityChallengeSigner) identityChallengeResponseSigned { return identityChallengeResponseSigned{Msg: i, Signature: secrets.Sign(i)} } @@ -307,7 +340,7 @@ func (i identityChallengeResponseSigned) Verify() bool { return i.Msg.Key.Verify(i.Msg, i.Signature) } -func (i identityVerificationMessage) Sign(secrets *crypto.SignatureSecrets) identityVerificationMessageSigned { +func (i identityVerificationMessage) Sign(secrets identityChallengeSigner) identityVerificationMessageSigned { return identityVerificationMessageSigned{Msg: i, Signature: secrets.Sign(i)} } diff --git a/network/netidentity_test.go b/network/netidentity_test.go index 9222da4600..8093d01b22 100644 --- a/network/netidentity_test.go +++ b/network/netidentity_test.go @@ -180,7 +180,7 @@ func TestIdentityChallengeSchemeBadSignature(t *testing.T) { // Copy the logic of attaching the header and signing so we can sign it wrong c := identityChallengeSigned{ Msg: identityChallenge{ - Key: i.identityKeys.SignatureVerifier, + Key: i.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), PublicAddress: []byte("i1"), }} @@ -232,7 +232,7 @@ func TestIdentityChallengeSchemeBadResponseSignature(t *testing.T) { r := http.Header{} resp := identityChallengeResponseSigned{ Msg: identityChallengeResponse{ - Key: i.identityKeys.SignatureVerifier, + Key: i.identityKeys.PublicKey(), Challenge: origChal, ResponseChallenge: newIdentityChallengeValue(), }} diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 42db4694c4..d4694a874f 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -29,18 +29,21 @@ import ( "github.com/libp2p/go-libp2p" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" + "github.com/libp2p/go-libp2p/p2p/security/noise" "github.com/libp2p/go-libp2p/p2p/transport/tcp" ) // Service defines the interface used by the network integrating with underlying p2p implementation type Service interface { Close() error - ID() peer.ID // return peer.ID for self + ID() peer.ID // return peer.ID for self + IDSigner() *PeerIDChallengeSigner AddrInfo() peer.AddrInfo // return addrInfo for self DialNode(context.Context, *peer.AddrInfo) error @@ -60,6 +63,7 @@ type serviceImpl struct { streams *streamManager pubsub *pubsub.PubSub pubsubCtx context.Context + privKey crypto.PrivKey topics map[string]*pubsub.Topic topicsMu deadlock.RWMutex @@ -99,6 +103,7 @@ func makeHost(cfg config.Local, datadir string, pstore peerstore.Peerstore) (hos libp2p.Muxer("/yamux/1.0.0", &ymx), libp2p.Peerstore(pstore), libp2p.ListenAddrStrings(listenAddr), + libp2p.Security(noise.ID, noise.New), ) } @@ -125,6 +130,7 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, data streams: sm, pubsub: ps, pubsubCtx: ctx, + privKey: privKey, topics: make(map[string]*pubsub.Topic), }, nil } @@ -139,6 +145,11 @@ func (s *serviceImpl) ID() peer.ID { return s.host.ID() } +// IDSigner returns a PeerIDChallengeSigner that implements the network identityChallengeSigner interface +func (s *serviceImpl) IDSigner() *PeerIDChallengeSigner { + return &PeerIDChallengeSigner{key: s.privKey} +} + // DialPeersUntilTargetCount attempts to establish connections to the provided phonebook addresses func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) { peerIDs := s.host.Peerstore().Peers() diff --git a/network/p2p/peerID.go b/network/p2p/peerID.go index 4d808b05e9..9050bc021d 100644 --- a/network/p2p/peerID.go +++ b/network/p2p/peerID.go @@ -25,6 +25,7 @@ import ( "path" "github.com/algorand/go-algorand/config" + algocrypto "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/util" "github.com/libp2p/go-libp2p/core/crypto" @@ -104,3 +105,49 @@ func generatePrivKey() (crypto.PrivKey, error) { priv, _, err := crypto.GenerateEd25519Key(rand.Reader) return priv, err } + +// PeerIDChallengeSigner implements the identityChallengeSigner interface in the network package. +type PeerIDChallengeSigner struct { + key crypto.PrivKey +} + +// Sign implements the identityChallengeSigner interface. +func (p *PeerIDChallengeSigner) Sign(message algocrypto.Hashable) algocrypto.Signature { + return p.SignBytes(algocrypto.HashRep(message)) +} + +// SignBytes implements the identityChallengeSigner interface. +func (p *PeerIDChallengeSigner) SignBytes(message []byte) algocrypto.Signature { + // libp2p Ed25519PrivateKey.Sign() returns a signature with a length of 64 bytes and no error + sig, err := p.key.Sign(message) + if len(sig) != len(algocrypto.Signature{}) { + panic(fmt.Sprintf("invalid signature length: %d", len(sig))) + } + if err != nil { + panic(err) + } + return algocrypto.Signature(sig) +} + +// Verify implements the identityChallengeSigner interface. +func (p *PeerIDChallengeSigner) Verify(message algocrypto.Hashable, sig algocrypto.Signature) bool { + // libp2p Ed25519PublicKey.Verify() returns a bool and no error + ret, err := p.key.GetPublic().Verify(algocrypto.HashRep(message), sig[:]) + if err != nil { + panic(err) + } + return ret +} + +// PublicKey implements the identityChallengeSigner interface. +func (p *PeerIDChallengeSigner) PublicKey() algocrypto.PublicKey { + // libp2p Ed25519PublicKey.Raw() returns a 32-byte public key and no error + pub, err := p.key.GetPublic().Raw() + if len(pub) != len(algocrypto.PublicKey{}) { + panic(fmt.Sprintf("invalid public key length: %d", len(pub))) + } + if err != nil { + panic(err) + } + return algocrypto.PublicKey(pub) +} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 71188b2566..b5122516ff 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -59,6 +59,7 @@ type P2PNetwork struct { handler msgHandler broadcaster msgBroadcaster wsPeers map[peer.ID]*wsPeer + wsPeersToIDs map[*wsPeer]peer.ID wsPeersLock deadlock.RWMutex wsPeersChangeCounter atomic.Int32 wsPeersConnectivityCheckTicker *time.Ticker @@ -68,6 +69,13 @@ type p2pPeerStats struct { txReceived atomic.Uint64 } +type gossipSubPeer struct { + peerID peer.ID + net GossipNode +} + +func (p gossipSubPeer) GetNetwork() GossipNode { return p.net } + // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID) (*P2PNetwork, error) { const readBufferLen = 2048 @@ -83,13 +91,14 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo } net := &P2PNetwork{ - log: log, - config: cfg, - genesisID: genesisID, - networkID: networkID, - topicTags: map[protocol.Tag]string{"TX": p2p.TXTopicName}, - wsPeers: make(map[peer.ID]*wsPeer), - peerStats: make(map[peer.ID]*p2pPeerStats), + log: log, + config: cfg, + genesisID: genesisID, + networkID: networkID, + topicTags: map[protocol.Tag]string{"TX": p2p.TXTopicName}, + wsPeers: make(map[peer.ID]*wsPeer), + wsPeersToIDs: make(map[*wsPeer]peer.ID), + peerStats: make(map[peer.ID]*p2pPeerStats), } net.ctx, net.ctxCancel = context.WithCancel(context.Background()) net.handler = msgHandler{ @@ -126,6 +135,16 @@ func (n *P2PNetwork) setup() error { return nil } +// PeerID returns this node's peer ID. +func (n *P2PNetwork) PeerID() p2p.PeerID { + return p2p.PeerID(n.service.ID()) +} + +// PeerIDSigner returns an identityChallengeSigner that uses the libp2p peer ID's private key. +func (n *P2PNetwork) PeerIDSigner() identityChallengeSigner { + return n.service.IDSigner() +} + // Start threads, listen on sockets. func (n *P2PNetwork) Start() { n.wg.Add(1) @@ -176,6 +195,7 @@ func (n *P2PNetwork) innerStop() { n.log.Warnf("Error closing peer %s: %v", peerID, err) } delete(n.wsPeers, peerID) + delete(n.wsPeersToIDs, peer) } n.wsPeersLock.Unlock() closeGroup.Wait() @@ -244,27 +264,35 @@ func (n *P2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, w } // Disconnect from a peer, probably due to protocol errors. -func (n *P2PNetwork) Disconnect(badnode Peer) { - node, ok := badnode.(peer.ID) - if !ok { - n.log.Warnf("Unknown peer type %T", badnode) - return - } +func (n *P2PNetwork) Disconnect(badpeer DisconnectablePeer) { + var peerID peer.ID + var wsp *wsPeer + n.wsPeersLock.Lock() defer n.wsPeersLock.Unlock() - if wsPeer, ok := n.wsPeers[node]; ok { - wsPeer.CloseAndWait(time.Now().Add(peerDisconnectionAckDuration)) - delete(n.wsPeers, node) + switch p := badpeer.(type) { + case gossipSubPeer: // Disconnect came from a message received via GossipSub + peerID, wsp = p.peerID, n.wsPeers[p.peerID] + case *wsPeer: // Disconnect came from a message received via wsPeer + peerID, wsp = n.wsPeersToIDs[p], p + default: + n.log.Warnf("Unknown peer type %T", badpeer) + return + } + if wsp != nil { + wsp.CloseAndWait(time.Now().Add(peerDisconnectionAckDuration)) + delete(n.wsPeers, peerID) + delete(n.wsPeersToIDs, wsp) } else { - n.log.Warnf("Could not find wsPeer reference for peer %s", node) + n.log.Warnf("Could not find wsPeer reference for peer %s", peerID) } - err := n.service.ClosePeer(node) + err := n.service.ClosePeer(peerID) if err != nil { - n.log.Warnf("Error disconnecting from peer %s: %v", node, err) + n.log.Warnf("Error disconnecting from peer %s: %v", peerID, err) } } -func (n *P2PNetwork) disconnectThread(badnode Peer, reason disconnectReason) { +func (n *P2PNetwork) disconnectThread(badnode DisconnectablePeer, reason disconnectReason) { defer n.wg.Done() n.Disconnect(badnode) // ignores reason } @@ -309,7 +337,7 @@ func (n *P2PNetwork) ClearHandlers() { } // GetRoundTripper returns a Transport that would limit the number of outgoing connections. -func (n *P2PNetwork) GetRoundTripper() http.RoundTripper { +func (n *P2PNetwork) GetRoundTripper(peer Peer) http.RoundTripper { return http.DefaultTransport } @@ -323,11 +351,6 @@ func (n *P2PNetwork) OnNetworkAdvance() {} // request that was provided to the http handler ( or provide a fallback Context() to that ) func (n *P2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) { return nil } -// SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID. -func (n *P2PNetwork) SubstituteGenesisID(rawURL string) string { - return strings.Replace(rawURL, "{genesisID}", n.genesisID, -1) -} - // wsStreamHandler is a callback that the p2p package calls when a new peer connects and establishes a // stream for the websocket protocol. func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream network.Stream, incoming bool) { @@ -358,13 +381,14 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream n } // create a wsPeer for this stream and added it to the peers map. wsp := &wsPeer{ - wsPeerCore: makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, n.GetRoundTripper(), addr), + wsPeerCore: makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, n.GetRoundTripper(nil), addr), conn: &wsPeerConnP2PImpl{stream: stream}, outgoing: !incoming, } wsp.init(n.config, outgoingMessagesBufferSize) n.wsPeersLock.Lock() n.wsPeers[peer] = wsp + n.wsPeersToIDs[wsp] = peer n.wsPeersLock.Unlock() n.wsPeersChangeCounter.Add(1) } @@ -374,6 +398,7 @@ func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) { remotePeerID := peer.conn.(*wsPeerConnP2PImpl).stream.Conn().RemotePeer() n.wsPeersLock.Lock() delete(n.wsPeers, remotePeerID) + delete(n.wsPeersToIDs, peer) n.wsPeersLock.Unlock() n.wsPeersChangeCounter.Add(1) } @@ -438,7 +463,7 @@ func (n *P2PNetwork) txTopicHandleLoop() { // txTopicValidator calls txHandler to validate and process incoming transactions. func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult { inmsg := IncomingMessage{ - Sender: msg.ReceivedFrom, + Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n}, Tag: protocol.TxnTag, Data: msg.Data, Net: n, @@ -446,7 +471,7 @@ func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg * } // if we sent the message, don't validate it - if inmsg.Sender == n.service.ID() { + if msg.ReceivedFrom == n.service.ID() { return pubsub.ValidationAccept } diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 3b6d127596..75fc66b0ad 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -197,6 +197,10 @@ func (s *mockService) ID() peer.ID { return s.id } +func (s *mockService) IDSigner() *p2p.PeerIDChallengeSigner { + panic("not implemented") +} + func (s *mockService) AddrInfo() peer.AddrInfo { return peer.AddrInfo{ ID: s.id, diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 05e7ba44ca..f6978dc3ac 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -44,6 +44,7 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/logging/telemetryspec" "github.com/algorand/go-algorand/network/limitlistener" + "github.com/algorand/go-algorand/network/p2p" "github.com/algorand/go-algorand/protocol" tools_network "github.com/algorand/go-algorand/tools/network" "github.com/algorand/go-algorand/tools/network/dnssec" @@ -195,6 +196,9 @@ type WebsocketNetwork struct { NetworkID protocol.NetworkID RandomID string + peerID p2p.PeerID + peerIDSigner identityChallengeSigner + ready atomic.Int32 readyChan chan struct{} @@ -340,7 +344,7 @@ type networkPeerManager interface { // used by msgHandler Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - disconnectThread(badnode Peer, reason disconnectReason) + disconnectThread(badnode DisconnectablePeer, reason disconnectReason) checkPeersConnectivity() } @@ -455,13 +459,13 @@ func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, return nil } -func (wn *WebsocketNetwork) disconnectThread(badnode Peer, reason disconnectReason) { +func (wn *WebsocketNetwork) disconnectThread(badnode DisconnectablePeer, reason disconnectReason) { defer wn.wg.Done() wn.disconnect(badnode, reason) } // Disconnect from a peer, probably due to protocol errors. -func (wn *WebsocketNetwork) Disconnect(node Peer) { +func (wn *WebsocketNetwork) Disconnect(node DisconnectablePeer) { wn.disconnect(node, disconnectBadData) } @@ -543,14 +547,14 @@ func (wn *WebsocketNetwork) GetPeers(options ...PeerOption) []Peer { var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryRelayRole) for _, addr := range addrs { - peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(nil), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersPhonebookArchivalNodes: var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryRelayRole) for _, addr := range addrs { - peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(nil), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersPhonebookArchivers: @@ -558,7 +562,7 @@ func (wn *WebsocketNetwork) GetPeers(options ...PeerOption) []Peer { var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryArchiverRole) for _, addr := range addrs { - peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(nil), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersConnectedIn: @@ -718,12 +722,17 @@ func (wn *WebsocketNetwork) Start() { } } } - // if the network has a public address, use that as the name for connection deduplication - if wn.config.PublicAddress != "" { + // if the network has a public address or a libp2p peer ID, use that as the name for connection deduplication + if wn.config.PublicAddress != "" || (wn.peerID != "" && wn.peerIDSigner != nil) { wn.RegisterHandlers(identityHandlers) } - if wn.identityScheme == nil && wn.config.PublicAddress != "" { - wn.identityScheme = NewIdentityChallengeScheme(wn.config.PublicAddress) + if wn.identityScheme == nil { + if wn.peerID != "" && wn.peerIDSigner != nil { + wn.identityScheme = NewIdentityChallengeSchemeWithSigner(string(wn.peerID), wn.peerIDSigner) + } + if wn.config.PublicAddress != "" { + wn.identityScheme = NewIdentityChallengeScheme(wn.config.PublicAddress) + } } wn.meshUpdateRequests <- meshRequest{false, nil} @@ -1074,7 +1083,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt } peer := &wsPeer{ - wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.otherPublicAddr, wn.GetRoundTripper(), trackedRequest.remoteHost), + wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.otherPublicAddr, wn.GetRoundTripper(nil), trackedRequest.remoteHost), conn: wsPeerWebsocketConnImpl{conn}, outgoing: false, InstanceName: trackedRequest.otherInstanceName, @@ -2009,7 +2018,7 @@ func (wn *WebsocketNetwork) numOutgoingPending() int { // GetRoundTripper returns an http.Transport that limits the number of connection // to comply with connectionsRateLimitingCount. -func (wn *WebsocketNetwork) GetRoundTripper() http.RoundTripper { +func (wn *WebsocketNetwork) GetRoundTripper(peer Peer) http.RoundTripper { return &wn.transport } @@ -2148,7 +2157,7 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { } peer := &wsPeer{ - wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(), "" /* origin */), + wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(nil), "" /* origin */), conn: wsPeerWebsocketConnImpl{conn}, outgoing: true, incomingMsgFilter: wn.incomingMsgFilter, @@ -2237,7 +2246,7 @@ func (wn *WebsocketNetwork) SetPeerData(peer Peer, key string, value interface{} } // NewWebsocketNetwork constructor for websockets based gossip network -func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo) (wn *WebsocketNetwork, err error) { +func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo, peerID p2p.PeerID, idSigner identityChallengeSigner) (wn *WebsocketNetwork, err error) { phonebook := MakePhonebook(config.ConnectionsRateLimitingCount, time.Duration(config.ConnectionsRateLimitingWindowSeconds)*time.Second) phonebook.AddPersistentPeers(phonebookAddresses, string(networkID), PhoneBookEntryRelayRole) @@ -2248,6 +2257,8 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre GenesisID: genesisID, NetworkID: networkID, nodeInfo: nodeInfo, + peerID: peerID, + peerIDSigner: idSigner, resolveSRVRecords: tools_network.ReadFromSRV, } @@ -2257,7 +2268,7 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre // NewWebsocketGossipNode constructs a websocket network node and returns it as a GossipNode interface implementation func NewWebsocketGossipNode(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID) (gn GossipNode, err error) { - return NewWebsocketNetwork(log, config, phonebookAddresses, genesisID, networkID, nil) + return NewWebsocketNetwork(log, config, phonebookAddresses, genesisID, networkID, nil, "", nil) } // SetPrioScheme specifies the network priority scheme for a network node @@ -2478,7 +2489,5 @@ func (wn *WebsocketNetwork) postMessagesOfInterestThread() { } } -// SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID. -func (wn *WebsocketNetwork) SubstituteGenesisID(rawURL string) string { - return strings.Replace(rawURL, "{genesisID}", wn.GenesisID, -1) -} +// GetGenesisID returns the network-specific genesisID. +func (wn *WebsocketNetwork) GetGenesisID() string { return wn.GenesisID } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 445ede3dc3..710ac3f532 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -1715,7 +1715,7 @@ func TestPeeringWithBadIdentityChallenge(t *testing.T) { attachChallenge: func(attach http.Header, addr string) identityChallengeValue { s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys c := identityChallenge{ - Key: s.identityKeys.SignatureVerifier, + Key: s.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), PublicAddress: []byte("incorrect address!"), } @@ -1733,7 +1733,7 @@ func TestPeeringWithBadIdentityChallenge(t *testing.T) { attachChallenge: func(attach http.Header, addr string) identityChallengeValue { s := NewIdentityChallengeScheme("does not matter") // make a scheme to use its keys c := identityChallenge{ - Key: s.identityKeys.SignatureVerifier, + Key: s.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), PublicAddress: []byte("incorrect address!"), }.Sign(s.identityKeys) @@ -1853,7 +1853,7 @@ func TestPeeringWithBadIdentityChallengeResponse(t *testing.T) { protocol.Decode(msg, &idChal) // make the response object, with an incorrect challenge encode it and attach it to the header r := identityChallengeResponse{ - Key: s.identityKeys.SignatureVerifier, + Key: s.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), ResponseChallenge: newIdentityChallengeValue(), } @@ -1876,7 +1876,7 @@ func TestPeeringWithBadIdentityChallengeResponse(t *testing.T) { protocol.Decode(msg, &idChal) // make the response object, then change the signature and encode and attach r := identityChallengeResponse{ - Key: s.identityKeys.SignatureVerifier, + Key: s.identityKeys.PublicKey(), Challenge: newIdentityChallengeValue(), ResponseChallenge: newIdentityChallengeValue(), }.Sign(s.identityKeys) diff --git a/network/wsPeer.go b/network/wsPeer.go index 9daf7b0ece..5cd0cc2b1c 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -364,6 +364,10 @@ func (wp *wsPeerCore) GetHTTPClient() *http.Client { return &wp.client } +func (wp *wsPeerCore) GetNetwork() GossipNode { + return wp.net +} + // Version returns the matching version from network.SupportedProtocolVersions func (wp *wsPeer) Version() string { return wp.version diff --git a/node/follower_node.go b/node/follower_node.go index 66790b1291..5d0887b61f 100644 --- a/node/follower_node.go +++ b/node/follower_node.go @@ -94,7 +94,7 @@ func MakeFollower(log logging.Logger, rootDir string, cfg config.Local, phoneboo node.config = cfg // tie network, block fetcher, and agreement services together - p2pNode, err := network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, nil) + p2pNode, err := network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, nil, "", nil) if err != nil { log.Errorf("could not create websocket node: %v", err) return nil, err diff --git a/node/node.go b/node/node.go index b637eda32e..0241fcda5a 100644 --- a/node/node.go +++ b/node/node.go @@ -211,7 +211,13 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd // tie network, block fetcher, and agreement services together var p2pNode network.GossipNode - if cfg.EnableP2P { + if cfg.EnableP2PHybridMode { + p2pNode, err = network.NewHybridP2PNetwork(node.log, node.config, node.genesisDirs.RootGenesisDir, phonebookAddresses, genesis.ID(), genesis.Network, node) + if err != nil { + log.Errorf("could not create hybrid p2p node: %v", err) + return nil, err + } + } else if cfg.EnableP2P { // TODO: pass more appropriate genesisDir (hot/cold). Presently this is just used to store a peerID key. p2pNode, err = network.NewP2PNetwork(node.log, node.config, node.genesisDirs.RootGenesisDir, phonebookAddresses, genesis.ID(), genesis.Network) if err != nil { @@ -220,7 +226,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd } } else { var wsNode *network.WebsocketNetwork - wsNode, err = network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, node) + wsNode, err = network.NewWebsocketNetwork(node.log, node.config, phonebookAddresses, genesis.ID(), genesis.Network, node, "", nil) if err != nil { log.Errorf("could not create websocket node: %v", err) return nil, err diff --git a/rpcs/blockService.go b/rpcs/blockService.go index a3bf886f2b..91bdceea09 100644 --- a/rpcs/blockService.go +++ b/rpcs/blockService.go @@ -497,7 +497,7 @@ func RawBlockBytes(l LedgerForBlockService, round basics.Round) ([]byte, error) // FormatBlockQuery formats a block request query for the given network and round number func FormatBlockQuery(round uint64, parsedURL string, net network.GossipNode) string { - return net.SubstituteGenesisID(path.Join(parsedURL, "/v1/{genesisID}/block/"+strconv.FormatUint(uint64(round), 36))) + return network.SubstituteGenesisID(net, path.Join(parsedURL, "/v1/{genesisID}/block/"+strconv.FormatUint(uint64(round), 36))) } func makeFallbackEndpoints(log logging.Logger, customFallbackEndpoints string) (fe fallbackEndpoints) { diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go index 832b59c55b..5c339ec895 100644 --- a/rpcs/blockService_test.go +++ b/rpcs/blockService_test.go @@ -70,6 +70,10 @@ func (mup *mockUnicastPeer) Respond(ctx context.Context, reqMsg network.Incoming return nil } +func (mup *mockUnicastPeer) GetNetwork() network.GossipNode { + panic("not implemented") +} + // TestHandleCatchupReqNegative covers the error reporting in handleCatchupReq func TestHandleCatchupReqNegative(t *testing.T) { partitiontest.PartitionTest(t) @@ -141,6 +145,8 @@ func TestRedirectFallbackArchiver(t *testing.T) { net1 := &httpTestPeerSource{} net2 := &httpTestPeerSource{} + net1.GenesisID = "test-genesis-ID" + net2.GenesisID = "test-genesis-ID" config := config.GetDefaultLocal() // Need to enable block service fallbacks @@ -249,6 +255,8 @@ func TestRedirectFallbackEndpoints(t *testing.T) { net1 := &httpTestPeerSource{} net2 := &httpTestPeerSource{} + net1.GenesisID = "test-genesis-ID" + net2.GenesisID = "test-genesis-ID" nodeA := &basicRPCNode{} nodeB := &basicRPCNode{} @@ -261,8 +269,8 @@ func TestRedirectFallbackEndpoints(t *testing.T) { // Set the first to a bad address, the second to self, and the third to the one that has the block. // If RR is right, should succeed. config.BlockServiceCustomFallbackEndpoints = fmt.Sprintf("://badaddress,%s,%s", nodeA.rootURL(), nodeB.rootURL()) - bs1 := MakeBlockService(log, config, ledger1, net1, "{genesisID}") - bs2 := MakeBlockService(log, config, ledger2, net2, "{genesisID}") + bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID") + bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID") nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1) nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2) @@ -312,6 +320,8 @@ func TestRedirectOnFullCapacity(t *testing.T) { net1 := &httpTestPeerSource{} net2 := &httpTestPeerSource{} + net1.GenesisID = "test-genesis-ID" + net2.GenesisID = "test-genesis-ID" config := config.GetDefaultLocal() // Need to enable block service fallbacks @@ -490,12 +500,13 @@ func TestRedirectExceptions(t *testing.T) { addBlock(t, ledger1) net1 := &httpTestPeerSource{} + net1.GenesisID = "test-genesis-ID" config := config.GetDefaultLocal() // Need to enable block service fallbacks config.EnableBlockServiceFallbackToArchiver = true - bs1 := MakeBlockService(log, config, ledger1, net1, "{genesisID}") + bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID") nodeA := &basicRPCNode{} diff --git a/rpcs/httpTxSync.go b/rpcs/httpTxSync.go index 6f42eeaacf..13d476ad77 100644 --- a/rpcs/httpTxSync.go +++ b/rpcs/httpTxSync.go @@ -107,14 +107,14 @@ func (hts *HTTPTxSync) Sync(ctx context.Context, bloom *bloom.Filter) (txgroups client := hpeer.GetHTTPClient() if client == nil { client = &http.Client{} - client.Transport = hts.peers.GetRoundTripper() + client.Transport = hts.peers.GetRoundTripper(peer) } parsedURL, err := network.ParseHostOrURL(hts.rootURL) if err != nil { hts.log.Warnf("txSync bad url %v: %s", hts.rootURL, err) return nil, err } - parsedURL.Path = hts.peers.SubstituteGenesisID(path.Join(parsedURL.Path, TxServiceHTTPPath)) + parsedURL.Path = network.SubstituteGenesisID(hts.peers, path.Join(parsedURL.Path, TxServiceHTTPPath)) syncURL := parsedURL.String() hts.log.Infof("http sync from %s", syncURL) params := url.Values{} diff --git a/rpcs/txService_test.go b/rpcs/txService_test.go index 8ef49e45a6..1c28ea80ee 100644 --- a/rpcs/txService_test.go +++ b/rpcs/txService_test.go @@ -22,7 +22,6 @@ import ( "net/http" "net/url" "os" - "strings" "sync" "testing" "time" @@ -116,9 +115,7 @@ func (b *basicRPCNode) GetPeers(options ...network.PeerOption) []network.Peer { return b.peers } -func (b *basicRPCNode) SubstituteGenesisID(rawURL string) string { - return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1) -} +func (b *basicRPCNode) GetGenesisID() string { return "test genesisID" } func nodePair() (*basicRPCNode, *basicRPCNode) { nodeA := &basicRPCNode{} diff --git a/rpcs/txSyncer_test.go b/rpcs/txSyncer_test.go index 43e85f4523..6d8928c3fe 100644 --- a/rpcs/txSyncer_test.go +++ b/rpcs/txSyncer_test.go @@ -22,7 +22,6 @@ import ( "math/rand" "net/http" "net/rpc" - "strings" "sync/atomic" "testing" "time" @@ -170,9 +169,6 @@ type mockClientAggregator struct { func (mca *mockClientAggregator) GetPeers(options ...network.PeerOption) []network.Peer { return mca.peers } -func (mca *mockClientAggregator) SubstituteGenesisID(rawURL string) string { - return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1) -} const numberOfPeers = 10 @@ -283,7 +279,7 @@ func TestSync(t *testing.T) { runner := mockRunner{failWithNil: false, failWithError: false, txgroups: pool.PendingTxGroups()[len(pool.PendingTxGroups())-1:], done: make(chan *rpc.Call)} client := mockRPCClient{client: &runner, rootURL: nodeAURL, log: logging.TestingLog(t)} - clientAgg := mockClientAggregator{peers: []network.Peer{&client}} + clientAgg := mockClientAggregator{peers: []network.Peer{&client}, MockNetwork: mocks.MockNetwork{GenesisID: "test genesisID"}} handler := mockHandler{} syncerPool := makeMockPendingTxAggregate(3) syncer := MakeTxSyncer(syncerPool, &clientAgg, &handler, testSyncInterval, testSyncTimeout, config.GetDefaultLocal().TxSyncServeResponseSize) @@ -322,7 +318,7 @@ func TestStartAndStop(t *testing.T) { runner := mockRunner{failWithNil: false, failWithError: false, txgroups: pool.PendingTxGroups()[len(pool.PendingTxGroups())-1:], done: make(chan *rpc.Call)} client := mockRPCClient{client: &runner, rootURL: nodeAURL, log: logging.TestingLog(t)} - clientAgg := mockClientAggregator{peers: []network.Peer{&client}} + clientAgg := mockClientAggregator{peers: []network.Peer{&client}, MockNetwork: mocks.MockNetwork{GenesisID: "test genesisID"}} handler := mockHandler{} syncerPool := makeMockPendingTxAggregate(0) diff --git a/test/testdata/configs/config-v31.json b/test/testdata/configs/config-v31.json index b4de6dd5df..1179a5b629 100644 --- a/test/testdata/configs/config-v31.json +++ b/test/testdata/configs/config-v31.json @@ -55,6 +55,7 @@ "EnableMetricReporting": false, "EnableOutgoingNetworkMessageFiltering": true, "EnableP2P": false, + "EnableP2PHybridMode": false, "EnablePingHandler": true, "EnableProcessBlockStats": false, "EnableProfiler": false, @@ -96,6 +97,7 @@ "OptimizeAccountsDatabaseOnStartup": false, "OutgoingMessageFilterBucketCount": 3, "OutgoingMessageFilterBucketSize": 128, + "P2PListenAddress": "", "P2PPersistPeerID": false, "P2PPrivateKeyLocation": "", "ParticipationKeysRefreshInterval": 60000000000,