Skip to content

Commit

Permalink
p2p: reuse existing libp2p.Host for http clients (#6129)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Sep 16, 2024
1 parent 2b6e018 commit 619d257
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 18 deletions.
51 changes: 41 additions & 10 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,44 @@ func (s *HTTPServer) RegisterHTTPHandlerFunc(path string, handler func(http.Resp
})
}

// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
if err != nil {
return nil, err
type httpClientConfig struct {
host host.Host
}

type httpClientOption func(*httpClientConfig)

// WithHost sets the libp2p host for the http client.
func WithHost(h host.Host) httpClientOption {
return func(o *httpClientConfig) {
o.host = h
}
}

// MakeTestHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// This exported method is only used in tests.
func MakeTestHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
return makeHTTPClient(addrInfo, opts...)
}

// makeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// If service is nil, a new libp2p host is created.
func makeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
var config httpClientConfig
for _, opt := range opts {
opt(&config)
}

var clientStreamHost host.Host
if config.host != nil {
clientStreamHost = config.host
} else {
var err error
clientStreamHost, err = libp2p.New(libp2p.NoListenAddrs)
if err != nil {
return nil, err
}
logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String())
}
logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String())

client := libp2phttp.Host{StreamHost: clientStreamHost}

Expand All @@ -100,13 +131,13 @@ func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
return &http.Client{Transport: rt}, nil
}

// MakeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, pstore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := MakeHTTPClient(addrInfo)
// makeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func makeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, p2pService *serviceImpl, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := makeHTTPClient(addrInfo, WithHost(p2pService.host))
if err != nil {
return nil, err
}
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, string(addrInfo.ID))
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(connTimeStore, queueingTimeout, cl.Transport, string(addrInfo.ID))
cl.Transport = &rltr
return cl, nil

Expand Down
10 changes: 10 additions & 0 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"encoding/base32"
"fmt"
"net"
"net/http"
"runtime"
"strings"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/limitcaller"
pstore "github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-algorand/util/metrics"
Expand Down Expand Up @@ -69,6 +71,9 @@ type Service interface {
ListPeersForTopic(topic string) []peer.ID
Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error)
Publish(ctx context.Context, topic string, data []byte) error

// GetHTTPClient returns a rate-limiting libp2p-streaming http client that can be used to make requests to the given peer
GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error)
}

// serviceImpl manages integration with libp2p and implements the Service interface
Expand Down Expand Up @@ -412,3 +417,8 @@ func addressFilter(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr {
}
return res
}

// GetHTTPClient returns a libp2p-streaming http client that can be used to make requests to the given peer
func (s *serviceImpl) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
return makeHTTPClientWithRateLimit(addrInfo, s, connTimeStore, queueingTimeout)
}
2 changes: 1 addition & 1 deletion network/p2p/testing/httpNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (p httpPeer) GetAddress() string {

// GetAddress implements HTTPPeer interface and returns the http client for a peer
func (p httpPeer) GetHTTPClient() *http.Client {
c, err := p2p.MakeHTTPClient(&p.addrInfo)
c, err := p2p.MakeTestHTTPClient(&p.addrInfo)
require.NoError(p.tb, err)
return c
}
Expand Down
6 changes: 3 additions & 3 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func addrInfoToWsPeerCore(n *P2PNetwork, addrInfo *peer.AddrInfo) (wsPeerCore, b
}
addr := mas[0].String()

client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("MakeHTTPClient failed: %v", err)
return wsPeerCore{}, false
Expand Down Expand Up @@ -718,7 +718,7 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) {
if err != nil {
return nil, err
}
return p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
return n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
}

// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress.
Expand Down Expand Up @@ -771,7 +771,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea

// create a wsPeer for this stream and added it to the peers map.
addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}}
client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("Cannot construct HTTP Client for %s: %v", p2pPeer, err)
client = nil
Expand Down
14 changes: 10 additions & 4 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ func (s *mockService) Publish(ctx context.Context, topic string, data []byte) er
return nil
}

func (s *mockService) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
return nil, nil
}

func makeMockService(id peer.ID, addrs []ma.Multiaddr) *mockService {
return &mockService{
id: id,
Expand Down Expand Up @@ -757,7 +761,7 @@ func TestP2PHTTPHandler(t *testing.T) {
require.NoError(t, err)
require.NotZero(t, addrsA[0])

httpClient, err := p2p.MakeHTTPClient(&peerInfoA)
httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err := httpClient.Get("/test")
require.NoError(t, err)
Expand All @@ -768,7 +772,7 @@ func TestP2PHTTPHandler(t *testing.T) {
require.Equal(t, "hello", string(body))

// check another endpoint that also access the underlying connection/stream
httpClient, err = p2p.MakeHTTPClient(&peerInfoA)
httpClient, err = p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err = httpClient.Get("/check-conn")
require.NoError(t, err)
Expand All @@ -780,10 +784,12 @@ func TestP2PHTTPHandler(t *testing.T) {

// check rate limiting client:
// zero clients allowed, rate limiting window (10s) is greater than queue deadline (1s)
netB, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
pstore, err := peerstore.MakePhonebook(0, 10*time.Second)
require.NoError(t, err)
pstore.AddPersistentPeers([]*peer.AddrInfo{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole)
httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, pstore, 1*time.Second)
httpClient, err = netB.service.GetHTTPClient(&peerInfoA, pstore, 1*time.Second)
require.NoError(t, err)
_, err = httpClient.Get("/test")
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
Expand Down Expand Up @@ -815,7 +821,7 @@ func TestP2PHTTPHandlerAllInterfaces(t *testing.T) {
require.NotZero(t, addrsB[0])

t.Logf("peerInfoB: %s", peerInfoA)
httpClient, err := p2p.MakeHTTPClient(&peerInfoA)
httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err := httpClient.Get("/test")
require.NoError(t, err)
Expand Down

0 comments on commit 619d257

Please sign in to comment.