Skip to content

Commit

Permalink
ip metadata loaded to clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
hdser committed Sep 3, 2024
1 parent e56f0bb commit f230fe7
Show file tree
Hide file tree
Showing 7 changed files with 64,039 additions and 12,006 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ peerstore.db
*.parquet
*.sqlite
*.sqlite-journal
**/._*
api_keys.txt
39 changes: 20 additions & 19 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,41 +1,42 @@
ARG GO_VERSION=1.22.5
FROM golang:${GO_VERSION}-bookworm as builder

# Install CA certificates
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
# Stage 1: Dependency management and build
FROM golang:${GO_VERSION}-bookworm as builder

# Set the working directory inside the container to /usr/src/app
WORKDIR /usr/src/app
WORKDIR /app

# Copy the Go module files first to leverage Docker cache for dependency layers
# Copy go.mod and go.sum files
COPY go.mod go.sum ./

# Run module download separately to also leverage caching of downloaded modules
# Download dependencies and verify modules
RUN go mod download && go mod verify

# Copy the CSV file into a data directory within the builder stage
COPY ip_metadata.csv ./data/ip_metadata.csv

# Copy the rest of the application source code
COPY . .

# Build the application; output the binary to a known location
RUN go build -v -o /run-app .
# Run go mod tidy to ensure the go.mod file is up to date
RUN go mod tidy

# Expose port 9000 if it's being used by the application
EXPOSE 9000
# Build the application and capture the output
RUN go build -v -o /run-app .

# Final stage based on Debian Bookworm.
FROM debian:bookworm
# Stage 2: Final stage
FROM debian:bookworm-slim

# Install CA certificates in the final image to ensure they are present.
# Install CA certificates in the final image
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

# Copy the built executable from the builder stage
COPY --from=builder /run-app /usr/local/bin/run-app

# Copy the CSV and other data files from the builder stage to the runtime image
COPY --from=builder /usr/src/app/data /data
# Create necessary directory
RUN mkdir -p /app/data

# Copy the CSV file to /app/data
COPY /data/ip_metadata.csv /app/data/ip_metadata.csv

# Set the working directory
WORKDIR /app

# Set the command to run the application
CMD ["/usr/local/bin/run-app"]
175 changes: 173 additions & 2 deletions clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,181 @@ import (
"github.com/chainbound/valtrack/log"
"github.com/chainbound/valtrack/types"
"github.com/rs/zerolog"

"encoding/csv"
"encoding/json"
"os"
"io"
"strconv"
"strings"
"regexp"
)

func (c *ClickhouseClient) LoadIPMetadataFromCSV() error {
// Check if the table is empty
isEmpty, err := c.isTableEmpty("ip_metadata")
if err != nil {
return fmt.Errorf("failed to check if table is empty: %w", err)
}

if !isEmpty {
c.log.Info().Msg("ip_metadata table is not empty, skipping CSV load")
return nil
}

csvPath := "/app/data/ip_metadata.csv"

file, err := os.Open(csvPath)
if err != nil {
c.log.Error().Err(err).Str("path", csvPath).Msg("Failed to open IP metadata CSV file")
return err
}
defer file.Close()

c.log.Info().Str("path", csvPath).Msg("Successfully opened IP metadata CSV file")

reader := csv.NewReader(file)
reader.FieldsPerRecord = -1 // Allow variable number of fields

batch, err := c.chConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata")
if err != nil {
return fmt.Errorf("failed to prepare batch: %w", err)
}

for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error reading CSV record: %w", err)
}

// Ensure we have at least the minimum required fields
if len(record) < 9 {
c.log.Warn().Str("row", strings.Join(record, ",")).Msg("Skipping row with insufficient columns")
continue
}

// Parse latitude and longitude
latLon := strings.Split(record[5], ",")
lat, err := parseFloat(latLon[0])
if err != nil {
c.log.Warn().Str("latitude", latLon[0]).Err(err).Msg("Invalid latitude, using 0")
lat = 0
}
lon, err := parseFloat(latLon[1])
if err != nil {
c.log.Warn().Str("longitude", latLon[1]).Err(err).Msg("Invalid longitude, using 0")
lon = 0
}

// Parse ASN info
var asnInfo struct {
ASN string `json:"asn"`
Name string `json:"name"`
Type string `json:"type"`
Domain string `json:"domain"`
Route string `json:"route"`
}
// Preprocess the JSON string
asnJSONString := record[8]

// Handle "nan" case
if strings.ToLower(asnJSONString) == "nan" {
c.log.Warn().Str("original_asn_json", asnJSONString).Msg("ASN JSON is 'nan', using default values")
asnInfo = struct {
ASN string `json:"asn"`
Name string `json:"name"`
Type string `json:"type"`
Domain string `json:"domain"`
Route string `json:"route"`
}{ASN: "", Name: "", Type: "", Domain: "", Route: ""}
continue // Skip to the next record
}

// Replace single quotes with double quotes
asnJSONString = strings.ReplaceAll(asnJSONString, "'", "\"")

// Use regex to find the "name" field and properly escape its value
re := regexp.MustCompile(`"name":\s*"((?:[^"\\]|\\.)*)"`)
asnJSONString = re.ReplaceAllStringFunc(asnJSONString, func(match string) string {
parts := re.FindStringSubmatch(match)
if len(parts) < 2 {
return match
}
// Escape any double quotes within the value
escapedValue := strings.ReplaceAll(parts[1], "\"", "\\\"")
return fmt.Sprintf(`"name": "%s"`, escapedValue)
})

// Ensure the JSON string is complete
if !strings.HasSuffix(asnJSONString, "}") {
asnJSONString += "}"
}

c.log.Debug().Str("processed_asn_json", asnJSONString).Msg("Processed ASN JSON before parsing")

if err := json.Unmarshal([]byte(asnJSONString), &asnInfo); err != nil {
c.log.Warn().Str("original_asn_json", record[8]).Str("processed_asn_json", asnJSONString).Err(err).Msg("Failed to parse ASN JSON, using default values")
asnInfo = struct {
ASN string `json:"asn"`
Name string `json:"name"`
Type string `json:"type"`
Domain string `json:"domain"`
Route string `json:"route"`
}{ASN: "", Name: "", Type: "", Domain: "", Route: ""}
}

err = batch.Append(
record[0], // IP
record[1], // Hostname
record[2], // City
record[3], // Region
record[4], // Country
lat, // Latitude
lon, // Longitude
record[7], // Postal Code
strings.TrimPrefix(asnInfo.ASN, "AS"), // ASN
asnInfo.Name, // ASN Organization
asnInfo.Type, // ASN Type
)
if err != nil {
return fmt.Errorf("failed to append to batch: %w", err)
}
}

if err := batch.Send(); err != nil {
return fmt.Errorf("failed to send batch: %w", err)
}


c.log.Info().Msg("Successfully loaded IP metadata from CSV")
return nil
}

// Helper function to parse float values, treating "nan" as 0
func parseFloat(s string) (float64, error) {
s = strings.TrimSpace(s)
if s == "" || strings.ToLower(s) == "nan" {
return 0, nil
}
return strconv.ParseFloat(s, 64)
}

func (c *ClickhouseClient) isTableEmpty(tableName string) (bool, error) {
query := fmt.Sprintf("SELECT count(*) FROM %s", tableName)
var count uint64
err := c.chConn.QueryRow(context.Background(), query).Scan(&count)
if err != nil {
return false, err
}
return count == 0, nil
}

func ValidatorMetadataDDL(db string) string {
return fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.validator_metadata (
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.validator_metadata (
enr String,
id String,
multiaddr String,
Expand Down Expand Up @@ -251,4 +422,4 @@ func sendBatch(client *ClickhouseClient, batch driver.Batch) error {
}
client.log.Info().Msg("Batch sent successfully")
return nil
}
}
21 changes: 20 additions & 1 deletion consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (c *Consumer) Start(name string) error {
return err
}

// Load IP metadata from CSV
if err := c.chClient.LoadIPMetadataFromCSV(); err != nil {
c.log.Error().Err(err).Msg("Failed to load IP metadata from CSV")
}

go func() {
for {
batch, err := consumer.FetchNoWait(BATCH_SIZE)
Expand Down Expand Up @@ -303,7 +308,7 @@ func handleMessage(c *Consumer, msg jetstream.Msg) {
progress := float64(md.Sequence.Stream) / (float64(md.NumPending) + float64(md.Sequence.Stream)) * 100

switch msg.Subject() {
case "events.ip_metadata":
case "events.ip_metadata":
var ipEvent types.IPMetadataEvent
if err := json.Unmarshal(msg.Data(), &ipEvent); err != nil {
c.log.Err(err).Msg("Error unmarshaling IPMetadataEvent")
Expand All @@ -313,6 +318,11 @@ func handleMessage(c *Consumer, msg jetstream.Msg) {
c.log.Info().Str("IP", ipEvent.IP).Msg("IP metadata received")
c.ipMetadataChan <- &ipEvent

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.IPMetadataEventChan <- &ipEvent
}

case "events.peer_discovered":
var event types.PeerDiscoveredEvent
if err := json.Unmarshal(msg.Data(), &event); err != nil {
Expand All @@ -324,6 +334,11 @@ func handleMessage(c *Consumer, msg jetstream.Msg) {
c.log.Info().Time("timestamp", md.Timestamp).Uint64("pending", md.NumPending).Str("progress", fmt.Sprintf("%.2f%%", progress)).Msg("peer_discovered")
c.storeDiscoveryEvent(event)

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.PeerDiscoveredEventChan <- &event
}

case "events.metadata_received":
var event types.MetadataReceivedEvent
if err := json.Unmarshal(msg.Data(), &event); err != nil {
Expand All @@ -336,6 +351,10 @@ func handleMessage(c *Consumer, msg jetstream.Msg) {
c.handleMetadataEvent(event)
c.storeMetadataEvent(event)

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.MetadataReceivedEventChan <- &event
}

default:
c.log.Warn().Str("subject", msg.Subject()).Msg("Unknown event type")
Expand Down
Loading

0 comments on commit f230fe7

Please sign in to comment.