Skip to content

Commit

Permalink
fix ipinfo
Browse files Browse the repository at this point in the history
  • Loading branch information
hdser committed Sep 30, 2024
1 parent e63e49c commit 656336b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 80 deletions.
33 changes: 11 additions & 22 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,42 +1,31 @@
ARG GO_VERSION=1.22.5
FROM golang:1.22.5-bookworm as builder

# Stage 1: Dependency management and build
FROM golang:${GO_VERSION}-bookworm as builder
# Install CA certificates and build essentials
RUN apt-get update && apt-get install -y ca-certificates build-essential && rm -rf /var/lib/apt/lists/*

# Set the working directory inside the container to /app
WORKDIR /app

# Copy go.mod and go.sum files
# Copy the Go module files
COPY go.mod go.sum ./

# Download dependencies and verify modules
RUN go mod download && go mod verify
# Download dependencies
RUN go mod download

# Copy the rest of the application source code
COPY . .

# Run go mod tidy to ensure the go.mod file is up to date
RUN go mod tidy
# Build the application; output the binary to a known location
RUN go build -v -o /run-app .

# Build the application and capture the output
RUN go build -v -o /run-app .

# Stage 2: Final stage
# Final stage based on Debian Bookworm-slim
FROM debian:bookworm-slim

# Install CA certificates in the final image
# Install CA certificates in the final image to ensure they are present.
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

# Copy the built executable from the builder stage
COPY --from=builder /run-app /usr/local/bin/run-app

# Create necessary directory
RUN mkdir -p /app/data

# Copy the CSV file to /app/data
COPY /data/ip_metadata.csv /app/data/ip_metadata.csv

# Set the working directory
WORKDIR /app

# Set the command to run the application
CMD ["/usr/local/bin/run-app"]
127 changes: 75 additions & 52 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"sync"
"net"
"strconv"

ma "github.com/multiformats/go-multiaddr"
ch "github.com/chainbound/valtrack/clickhouse"
"github.com/chainbound/valtrack/log"
"github.com/chainbound/valtrack/types"
"github.com/ipinfo/go/v2/ipinfo"
ma "github.com/multiformats/go-multiaddr"
_ "github.com/mattn/go-sqlite3"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/rs/zerolog"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
"github.com/ipinfo/go/v2/ipinfo"
)

const basePath = "/data"
Expand All @@ -48,10 +49,10 @@ type Consumer struct {
ipMetadataWriter *writer.ParquetWriter
js jetstream.JetStream

peerDiscoveredChan chan *types.PeerDiscoveredEvent
metadataReceivedChan chan *types.MetadataReceivedEvent
validatorMetadataChan chan *types.MetadataReceivedEvent
ipMetadataChan chan *types.IPMetadataEvent
peerDiscoveredChan chan *types.PeerDiscoveredEvent
metadataReceivedChan chan *types.MetadataReceivedEvent
validatorMetadataChan chan *types.MetadataReceivedEvent
ipMetadataChan chan *types.IPMetadataEvent

chClient *ch.ClickhouseClient
db *sql.DB
Expand All @@ -71,7 +72,7 @@ func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream
}

metadataFilePath := fmt.Sprintf("%s/metadata_events.parquet", basePath)
w_metadata, err := local.NewLocalFileWriter(metadataFilePath)
w_metadata, err := local.NewLocalFileWriter(metadataFilePath)
if err != nil {
return nil, fmt.Errorf("error creating metadata events parquet file: %w", err)
}
Expand Down Expand Up @@ -116,17 +117,17 @@ func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream
ipMetadataWriter: ipMetadataWriter,
js: js,

peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384),
metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384),
validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384),
ipMetadataChan: make(chan *types.IPMetadataEvent, 16384),
peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384),
metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384),
validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384),
ipMetadataChan: make(chan *types.IPMetadataEvent, 16384),

chClient: chClient,
db: db,
dune: dune,

ipCache: make(map[string]*types.IPMetadataEvent),
ipCacheTTL: 1 * time.Hour,
ipCache: make(map[string]*types.IPMetadataEvent),
ipCacheTTL: 1 * time.Hour,
ipInfoToken: os.Getenv("IPINFO_TOKEN"),
}, nil
}
Expand Down Expand Up @@ -164,13 +165,13 @@ func RunConsumer(cfg *ConsumerConfig) {
}

chCfg := ch.ClickhouseConfig{
Endpoint: cfg.ChCfg.Endpoint,
DB: cfg.ChCfg.DB,
Username: cfg.ChCfg.Username,
Password: cfg.ChCfg.Password,
MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize,
MaxIPMetadataBatchSize: cfg.ChCfg.MaxIPMetadataBatchSize,
MaxPeerDiscoveredEventsBatchSize: cfg.ChCfg.MaxPeerDiscoveredEventsBatchSize,
Endpoint: cfg.ChCfg.Endpoint,
DB: cfg.ChCfg.DB,
Username: cfg.ChCfg.Username,
Password: cfg.ChCfg.Password,
MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize,
MaxIPMetadataBatchSize: cfg.ChCfg.MaxIPMetadataBatchSize,
MaxPeerDiscoveredEventsBatchSize: cfg.ChCfg.MaxPeerDiscoveredEventsBatchSize,
MaxMetadataReceivedEventsBatchSize: cfg.ChCfg.MaxMetadataReceivedEventsBatchSize,
}

Expand All @@ -181,7 +182,7 @@ func RunConsumer(cfg *ConsumerConfig) {
log.Error().Err(err).Msg("Error creating Clickhouse client")
return
}
defer chClient.Close() // Add this line to ensure the client is closed
defer chClient.Close()

err = chClient.Start()
if err != nil {
Expand Down Expand Up @@ -269,15 +270,12 @@ func RunConsumer(cfg *ConsumerConfig) {
log.Error().Err(err).Msg("Error shutting down HTTP server")
}

// If you have a Clickhouse client, close it here
if chClient != nil {
if err := chClient.Close(); err != nil {
log.Error().Err(err).Msg("Error closing Clickhouse client")
}
}

// Any other cleanup can go here

log.Info().Msg("Consumer shutdown complete")
}

Expand All @@ -304,9 +302,10 @@ func (c *Consumer) Start(name string) error {
return err
}

// Load IP metadata from CSV
if err := c.chClient.LoadIPMetadataFromCSV(); err != nil {
c.log.Error().Err(err).Msg("Failed to load IP metadata from CSV")
if c.chClient != nil {
if err := c.chClient.LoadIPMetadataFromCSV(); err != nil {
c.log.Error().Err(err).Msg("Failed to load IP metadata from CSV")
}
}

go func() {
Expand Down Expand Up @@ -346,7 +345,6 @@ func (c *Consumer) handleMessage(msg jetstream.Msg) {
c.log.Info().Str("IP", ipEvent.IP).Msg("IP metadata received")
c.ipMetadataChan <- &ipEvent

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.IPMetadataEventChan <- &ipEvent
}
Expand All @@ -360,21 +358,18 @@ func (c *Consumer) handleMessage(msg jetstream.Msg) {
}

c.log.Info().Time("timestamp", md.Timestamp).Uint64("pending", md.NumPending).Str("progress", fmt.Sprintf("%.2f%%", progress)).Msg("peer_discovered")

// Fetch IP metadata synchronously

ipMetadata, err := c.getIPMetadata(event.IP)
if err != nil {
c.log.Error().Err(err).Str("ip", event.IP).Msg("Failed to fetch IP metadata")
} else {
// Insert IP metadata into ClickHouse
if err := c.ensureIPMetadataInClickHouse(ipMetadata); err != nil {
c.log.Error().Err(err).Str("ip", event.IP).Msg("Failed to ensure IP metadata in ClickHouse")
}
}

c.storeDiscoveryEvent(event)

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.PeerDiscoveredEventChan <- &event
}
Expand All @@ -391,7 +386,6 @@ func (c *Consumer) handleMessage(msg jetstream.Msg) {
c.handleMetadataEvent(event)
c.storeMetadataEvent(event)

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.MetadataReceivedEventChan <- &event
}
Expand All @@ -416,7 +410,10 @@ func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) {

// Check ClickHouse
metadata, err := c.getIPMetadataFromClickHouse(ip)
if err == nil {
if err != nil {
return nil, fmt.Errorf("error querying ClickHouse: %w", err)
}
if metadata != nil {
// Found in ClickHouse, cache and return
c.cacheIPMetadata(ip, metadata)
return metadata, nil
Expand All @@ -429,6 +426,12 @@ func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) {
}

metadata = convertIPInfoToMetadata(ipInfo)

// Store metadata into ClickHouse
if err := c.ensureIPMetadataInClickHouse(metadata); err != nil {
return nil, fmt.Errorf("failed to ensure IP metadata in ClickHouse: %w", err)
}

c.cacheIPMetadata(ip, metadata)

return metadata, nil
Expand All @@ -440,13 +443,13 @@ func (c *Consumer) getIPMetadataFromClickHouse(ip string) (*types.IPMetadataEven

var metadata types.IPMetadataEvent
query := fmt.Sprintf("SELECT ip, hostname, city, region, country, latitude, longitude, postal_code, asn, asn_organization, asn_type FROM ip_metadata WHERE ip = '%s'", ip)

err := c.chClient.QueryRow(ctx, query).Scan(
&metadata.IP, &metadata.Hostname, &metadata.City, &metadata.Region,
&metadata.Country, &metadata.Latitude, &metadata.Longitude,
&metadata.PostalCode, &metadata.ASN, &metadata.ASNOrganization, &metadata.ASNType,
)

if err != nil {
if err == sql.ErrNoRows {
return nil, nil // No metadata found for this IP
Expand All @@ -457,34 +460,52 @@ func (c *Consumer) getIPMetadataFromClickHouse(ip string) (*types.IPMetadataEven
return &metadata, nil
}

func (c *Consumer) fetchIPInfoFromAPI(ip string) (*ipinfo.IPInfo, error) {
client := ipinfo.NewClient(nil, ipinfo.NewCache(nil), c.ipInfoToken)
info, err := client.GetIPInfo(net.ParseIP(ip))
func (c *Consumer) fetchIPInfoFromAPI(ip string) (*ipinfo.Core, error) {
client := ipinfo.NewClient(nil, nil, c.ipInfoToken)
ipParsed := net.ParseIP(ip)
if ipParsed == nil {
return nil, fmt.Errorf("invalid IP address: %s", ip)
}
info, err := client.GetIPInfo(ipParsed)
if err != nil {
return nil, fmt.Errorf("IPInfo API error: %w", err)
}
return info, nil
}

func convertIPInfoToMetadata(info *ipinfo.IPInfo) *types.IPMetadataEvent {
lat, _ := strconv.ParseFloat(info.Latitude, 64)
lon, _ := strconv.ParseFloat(info.Longitude, 64)

func convertIPInfoToMetadata(info *ipinfo.Core) *types.IPMetadataEvent {
var lat, long float64
if info.Location != "" {
parts := strings.Split(info.Location, ",")
if len(parts) == 2 {
lat, _ = strconv.ParseFloat(parts[0], 64)
long, _ = strconv.ParseFloat(parts[1], 64)
}
}

var asn, asnOrganization, asnType string
if info.ASN != nil {
asn = info.ASN.ASN
asnOrganization = info.ASN.Name
asnType = info.ASN.Type
}

return &types.IPMetadataEvent{
IP: info.IP.String(),
Hostname: info.Hostname,
City: info.City,
Region: info.Region,
Country: info.CountryName,
Country: info.Country,
Latitude: lat,
Longitude: lon,
Longitude: long,
PostalCode: info.Postal,
ASN: info.ASN,
ASNOrganization: info.Org,
ASNType: "", // IPInfo might not provide ASN type directly
ASN: asn,
ASNOrganization: asnOrganization,
ASNType: asnType,
}
}


func (c *Consumer) cacheIPMetadata(ip string, metadata *types.IPMetadataEvent) {
c.ipCacheMu.Lock()
defer c.ipCacheMu.Unlock()
Expand All @@ -506,6 +527,8 @@ func (c *Consumer) ensureIPMetadataInClickHouse(metadata *types.IPMetadataEvent)
}
}



func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) {
longLived := indexesFromBitfield(event.MetaData.Attnets)

Expand Down Expand Up @@ -625,4 +648,4 @@ func (c *Consumer) sendIPMetadataToClickHouse(ipEvent *types.IPMetadataEvent) er
default:
return fmt.Errorf("ClickHouse channel is full or unavailable")
}
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/herumi/bls-eth-go-binary v1.28.1 // indirect
github.com/herumi/bls-eth-go-binary v1.36.1 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
Expand Down Expand Up @@ -183,7 +183,7 @@ require (
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/herumi/bls-eth-go-binary v1.28.1 h1:fcIZ48y5EE9973k05XjE8+P3YiQgjZz4JI/YabAm8KA=
github.com/herumi/bls-eth-go-binary v1.28.1/go.mod h1:luAnRm3OsMQeokhGzpYmc0ZKwawY7o87PUEP11Z7r7U=
github.com/herumi/bls-eth-go-binary v1.36.1 h1:SfLjxbO1fWkKtKS7J3Ezd1/5QXrcaTZgWynxdSe10hQ=
github.com/herumi/bls-eth-go-binary v1.36.1/go.mod h1:luAnRm3OsMQeokhGzpYmc0ZKwawY7o87PUEP11Z7r7U=
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4=
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
Expand Down Expand Up @@ -1482,8 +1482,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down

0 comments on commit 656336b

Please sign in to comment.