Skip to content

Commit

Permalink
feat(node): make nats optional
Browse files Browse the repository at this point in the history
  • Loading branch information
namn-grg committed May 28, 2024
1 parent 371c9d8 commit cb6ec57
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 83 deletions.
6 changes: 5 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type DiscConfig struct {
ForkDigest [4]byte
LogPath string
Bootnodes []*enode.Node
NatsURL string
}

var DefaultDiscConfig DiscConfig = DiscConfig{
Expand All @@ -64,7 +65,7 @@ var DefaultDiscConfig DiscConfig = DiscConfig{
TCP: 9000,
DBPath: "",
ForkDigest: [4]byte{0x6a, 0x95, 0xa1, 0xa9},
LogPath: "nodes.log",
LogPath: "peer_discovered.log",
Bootnodes: GetEthereumBootnodes(),
}

Expand Down Expand Up @@ -102,6 +103,8 @@ type NodeConfig struct {
ConcurrentDialers int
IP string
Port int
NatsURL string
LogPath string
}

var DefaultNodeConfig NodeConfig = NodeConfig{
Expand All @@ -113,4 +116,5 @@ var DefaultNodeConfig NodeConfig = NodeConfig{
ConcurrentDialers: 64,
IP: "0.0.0.0",
Port: 9000,
LogPath: "metadata_received.log",
}
3 changes: 2 additions & 1 deletion discovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Discovery struct {
node *ethereum.Node
}

func NewDiscovery() (*Discovery, error) {
func NewDiscovery(natsURL string) (*Discovery, error) {
var privBytes []byte

key, err := ecdsa.GenerateKey(gcrypto.S256(), rand.Reader)
Expand All @@ -33,6 +33,7 @@ func NewDiscovery() (*Discovery, error) {
nodeConfig := &config.DefaultNodeConfig
nodeConfig.PrivateKey = privateKey
nodeConfig.BeaconConfig = params.MainnetConfig()
nodeConfig.NatsURL = natsURL

n, err := ethereum.NewNode(nodeConfig)

Expand Down
14 changes: 11 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type Config struct {
logLevel string
natsURL string
}

func main() {
Expand All @@ -31,7 +32,7 @@ func main() {
level, _ := zerolog.ParseLevel(cfg.logLevel)
zerolog.SetGlobalLevel(level)

runSentry()
runSentry(cfg.natsURL)
return nil
},
},
Expand All @@ -55,6 +56,13 @@ func main() {
Value: "info",
Destination: &cfg.logLevel,
},
&cli.StringFlag{
Name: "nats",
Usage: "natsJS server url",
Aliases: []string{"n"},
Value: os.Getenv("NATS_URL"), // If nil, sentry will run without NATS
Destination: &cfg.natsURL,
},
},
}

Expand All @@ -64,8 +72,8 @@ func main() {

}

func runSentry() {
disc, err := discovery.NewDiscovery()
func runSentry(natsURL string) {
disc, err := discovery.NewDiscovery(natsURL)
if err != nil {
panic(err)
}
Expand Down
47 changes: 5 additions & 42 deletions pkg/ethereum/discv5.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net"
"os"
"sync"
"time"

"github.com/chainbound/valtrack/config"
"github.com/chainbound/valtrack/log"
Expand All @@ -21,7 +20,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -55,30 +53,9 @@ type DiscoveryV5 struct {
}

func NewDiscoveryV5(pk *ecdsa.PrivateKey, discConfig *config.DiscConfig) (*DiscoveryV5, error) {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

// Init NATS connection
nc, err := nats.Connect(url)
js, err := createNatsStream(discConfig.NatsURL)
if err != nil {
return nil, errors.Wrap(err, "Failed to connect to NATS")
}

js, _ := jetstream.New(nc)

cfgjs := jetstream.StreamConfig{
Name: "EVENTS",
Retention: jetstream.InterestPolicy,
Subjects: []string{"events.metadata_received", "events.peer_discovered"},
}

ctxJs := context.Background()

_, err = js.CreateOrUpdateStream(ctxJs, cfgjs)
if err != nil {
return nil, errors.Wrap(err, "Failed to create JetStream stream")
return nil, errors.Wrap(err, "failed to create NATS JetStream")
}

// New geth logger at debug level
Expand Down Expand Up @@ -169,7 +146,9 @@ func (d *DiscoveryV5) Serve(ctx context.Context) error {
// Start iterating over randomly discovered nodes
iter := d.Dv5Listener.RandomNodes()

d.startDiscoveryPublisher()
if d.js != nil {
d.startDiscoveryPublisher()
}

defer iter.Close()
defer close(d.out)
Expand Down Expand Up @@ -205,22 +184,6 @@ func (d *DiscoveryV5) Serve(ctx context.Context) error {

d.seenNodes[hInfo.ID] = NodeInfo{Node: *node, Flag: true}

externalIp := d.Dv5Listener.LocalNode().Node().IP()

d.log.Info().
Str("id", hInfo.ID.String()).
Str("ip", hInfo.IP).
Int("port", hInfo.Port).
Any("attnets", hInfo.Attr[EnrAttnetsAttribute]).
Str("enr", node.String()).
Str("external_ip", externalIp.String()).
Int("total", len(d.seenNodes)).
Msg("Discovered new node")

// Log to file
fmt.Fprintf(d.fileLogger, "%s ID: %s, IP: %s, Port: %d, ENR: %s, Maddr: %v, Attnets: %v, AttnetsNum: %v\n",
time.Now().Format(time.RFC3339), hInfo.ID.String(), hInfo.IP, hInfo.Port, node.String(), hInfo.MAddrs, hInfo.Attr[EnrAttnetsAttribute], hInfo.Attr[EnrAttnetsNumAttribute])

// Send peer event to channel
d.sendPeerEvent(ctx, node, hInfo)
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/ethereum/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package ethereum
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)

Expand All @@ -30,10 +34,57 @@ type MetadataReceivedEvent struct {
Timestamp int64 `json:"timestamp"` // Timestamp in UNIX milliseconds
}

func createNatsStream(url string) (js jetstream.JetStream, err error) {
// If empty URL, return nil and run without NATS
if url == "" {
return nil, nil
}
// Initialize NATS JetStream
nc, err := nats.Connect(url)
if err != nil {
return nil, errors.Wrap(err, "Failed to connect to NATS")
}

js, err = jetstream.New(nc)
if err != nil {
return nil, errors.Wrap(err, "Failed to create JetStream context")
}

cfgjs := jetstream.StreamConfig{
Name: "EVENTS",
Retention: jetstream.InterestPolicy,
Subjects: []string{"events.metadata_received", "events.peer_discovered"},
}

ctxJs := context.Background()

_, err = js.CreateOrUpdateStream(ctxJs, cfgjs)
if err != nil {
return nil, errors.Wrap(err, "Failed to create JetStream stream")
}
return js, nil
}

func (n *Node) sendMetadataEvent(ctx context.Context, event *MetadataReceivedEvent) {
event.CrawlerID = getCrawlerMachineID()
event.CrawlerLoc = getCrawlerLocation()

n.log.Info().
Str("id", event.ID).
Str("multiaddr", event.Multiaddr).
Uint("epoch", event.Epoch).
Any("metadata", event.MetaData).
Str("client_version", event.ClientVersion).
Str("crawler_id", event.CrawlerID).
Str("crawler_location", event.CrawlerLoc).
Msg("metadata_received event")

if n.js == nil {
fmt.Fprintf(n.fileLogger, "%s ID: %s, Multiaddr: %s, Epoch: %d, Metadata: %v, ClientVersion: %s, CrawlerID: %s, CrawlerLoc: %s\n",
time.Now().Format(time.RFC3339), event.ID, event.Multiaddr, event.Epoch, event.MetaData, event.ClientVersion, event.CrawlerID, event.CrawlerLoc)
return
}

select {
case n.metadataEventChan <- event:
n.log.Trace().Str("peer", event.ID).Msg("Sent metadata_received event to channel")
Expand Down Expand Up @@ -77,6 +128,21 @@ func (d *DiscoveryV5) sendPeerEvent(ctx context.Context, node *enode.Node, hInfo
Timestamp: time.Now().UnixMilli(),
}

d.log.Info().
Str("enr", node.String()).
Str("id", hInfo.ID.String()).
Str("ip", hInfo.IP).
Int("port", hInfo.Port).
Str("crawler_id", peerEvent.CrawlerID).
Str("crawler_location", peerEvent.CrawlerLoc).
Msg("peer_discovered event")

if d.js == nil {
fmt.Fprintf(d.fileLogger, "%s ENR: %s, ID: %s, IP: %s, Port: %d, CrawlerID: %s, CrawlerLoc: %s\n",
time.Now().Format(time.RFC3339), node.String(), hInfo.ID.String(), hInfo.IP, hInfo.Port, peerEvent.CrawlerID, peerEvent.CrawlerLoc)
return
}

select {
case d.discEventChan <- peerEvent:
d.log.Debug().Str("peer", node.ID().String()).Msg("Sent peer_discovered event to channel")
Expand Down
38 changes: 8 additions & 30 deletions pkg/ethereum/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
gomplex "github.com/libp2p/go-mplex"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
Expand Down Expand Up @@ -53,7 +52,7 @@ type Node struct {
func NewNode(cfg *config.NodeConfig) (*Node, error) {
log := log.NewLogger("node")

file, err := os.Create("handshakes.log")
file, err := os.Create(cfg.LogPath)
if err != nil {
return nil, errors.Wrap(err, "failed to create log file")
}
Expand All @@ -68,6 +67,7 @@ func NewNode(cfg *config.NodeConfig) (*Node, error) {

// TODO: read config from node config
conf := config.DefaultDiscConfig
conf.NatsURL = cfg.NatsURL
disc, err := NewDiscoveryV5(discKey, &conf)
if err != nil {
return nil, errors.Wrap(err, "failed to create DiscoveryV5 service")
Expand Down Expand Up @@ -112,33 +112,9 @@ func NewNode(cfg *config.NodeConfig) (*Node, error) {
return nil, fmt.Errorf("failed to create reqresp: %w", err)
}

url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

// Initialize NATS JetStream
nc, err := nats.Connect(url)
js, err := createNatsStream(cfg.NatsURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}

js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
}

cfgjs := jetstream.StreamConfig{
Name: "EVENTS",
Retention: jetstream.InterestPolicy,
Subjects: []string{"events.metadata_received", "events.peer_discovered"},
}

ctxJs := context.Background()

_, err = js.CreateOrUpdateStream(ctxJs, cfgjs)
if err != nil {
return nil, errors.Wrap(err, "Failed to create JetStream stream")
return nil, errors.Wrap(err, "failed to create NATS JetStream")
}

// Log the node's peer ID and addresses
Expand Down Expand Up @@ -181,8 +157,10 @@ func (n *Node) Start(ctx context.Context) error {
// Register the node itself as the notifiee for network connection events
n.host.Network().Notify(n)

n.startMetadataPublisher()

if n.js != nil {
// Start the metadata event publisher
n.startMetadataPublisher()
}
// Start the discovery service
go n.runDiscovery(ctx)

Expand Down
6 changes: 0 additions & 6 deletions pkg/ethereum/node_notifiee.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -104,9 +103,6 @@ func (n *Node) handleOutboundConnection(pid peer.ID) {
json, _ := json.Marshal(event)

n.log.Info().Msgf("Succesful handshake: %s", string(json))

fmt.Fprintln(n.fileLogger, string(json))

}

func (n *Node) handleInboundConnection(pid peer.ID) {
Expand Down Expand Up @@ -171,8 +167,6 @@ func (n *Node) handleInboundConnection(pid peer.ID) {
json, _ := json.Marshal(event)

n.log.Info().Msgf("Succesful handshake: %s", string(json))

fmt.Fprintln(n.fileLogger, string(json))
}

func (n *Node) waitForStatus(ctx context.Context, pid peer.ID) error {
Expand Down

0 comments on commit cb6ec57

Please sign in to comment.