diff --git a/Dockerfile.valtrack b/Dockerfile.valtrack index 18ec380..d25ff18 100644 --- a/Dockerfile.valtrack +++ b/Dockerfile.valtrack @@ -12,4 +12,4 @@ EXPOSE 9000 FROM debian:bookworm COPY --from=builder /run-app /usr/local/bin/ -CMD ["run-app"] \ No newline at end of file +CMD ["run-app", "sentry"] \ No newline at end of file diff --git a/clickhouse/client.go b/clickhouse/client.go index d7ec6e8..4a0f876 100644 --- a/clickhouse/client.go +++ b/clickhouse/client.go @@ -21,7 +21,7 @@ func ValidatorMetadataDDL(db string) string { port UInt16, last_seen UInt64, last_epoch UInt64, - possible_validator UInt8, + possible_validator Bool, average_validator_count Int32, num_observations UInt64 ) ENGINE = MergeTree() @@ -36,7 +36,7 @@ type ValidatorMetadataEvent struct { Port uint16 `ch:"port"` LastSeen uint64 `ch:"last_seen"` LastEpoch uint64 `ch:"last_epoch"` - PossibleValidator uint8 `ch:"possible_validator"` // Using uint as bool + PossibleValidator bool `ch:"possible_validator"` // Using uint as bool AverageValidatorCount int32 `ch:"average_validator_count"` NumObservations uint64 `ch:"num_observations"` } @@ -151,7 +151,7 @@ func (c *ClickhouseClient) validatorEventBatcher() { } } - c.log.Debug().Str("took", time.Since(start).String()).Int("channel_len", len(c.ValidatorEventChan)).Msg("Inserted validator_metadata batch") + c.log.Info().Str("took", time.Since(start).String()).Int("channel_len", len(c.ValidatorEventChan)).Msg("Inserted validator_metadata batch") // Reset batch for { diff --git a/cmd/cmd.go b/cmd/cmd.go index 8fe3dca..ced47c3 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -53,7 +53,7 @@ var ConsumerCommand = &cli.Command{ &cli.Uint64Flag{ Name: "batch-size", Usage: "Clickhouse max validator batch size", - Value: 64, + Value: 128, }, }, } @@ -64,11 +64,11 @@ func LaunchConsumer(c *cli.Context) error { NatsURL: c.String("nats-url"), Name: c.String("name"), ChCfg: clickhouse.ClickhouseConfig{ - Endpoint: c.String("clickhouse-endpoint"), - DB: c.String("clickhouse-db"), - Username: c.String("clickhouse-username"), - Password: c.String("clickhouse-password"), - MaxValidatorBatchSize: c.Uint64("clickhouse-max-validator-batch-size"), + Endpoint: c.String("endpoint"), + DB: c.String("db"), + Username: c.String("username"), + Password: c.String("password"), + MaxValidatorBatchSize: c.Uint64("batch-size"), }, } diff --git a/consumer/consumer.go b/consumer/consumer.go index 2f3d5ce..3e57b1e 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -159,7 +159,7 @@ func RunConsumer(cfg *ConsumerConfig) { Username: cfg.ChCfg.Username, Password: cfg.ChCfg.Password, - MaxValidatorBatchSize: 10, + MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize, } var chClient *ch.ClickhouseClient @@ -182,7 +182,7 @@ func RunConsumer(cfg *ConsumerConfig) { validatorWriter: validatorWriter, js: js, - validatorMetadataChan: make(chan *ethereum.MetadataReceivedEvent, 1000), + validatorMetadataChan: make(chan *ethereum.MetadataReceivedEvent, 16384), validatorCache: make(map[string]*ch.ValidatorMetadataEvent), chClient: chClient, @@ -370,22 +370,38 @@ func (c *Consumer) HandleValidatorMetadataEvent() error { for { select { case event := <-c.validatorMetadataChan: - c.log.Info().Any("event", event).Msg("Received validator event") + c.log.Trace().Any("event", event).Msg("Received validator event") - maddr, _ := ma.NewMultiaddr(event.Multiaddr) + maddr, err := ma.NewMultiaddr(event.Multiaddr) + if err != nil { + c.log.Error().Err(err).Msg("Invalid multiaddr") + continue + } - ip, _ := maddr.ValueForProtocol(ma.P_IP4) + ip, err := maddr.ValueForProtocol(ma.P_IP4) + if err != nil { + c.log.Error().Err(err).Msg("Invalid IP in multiaddr") + continue + } - portStr, _ := maddr.ValueForProtocol(ma.P_TCP) + portStr, err := maddr.ValueForProtocol(ma.P_TCP) + if err != nil { + c.log.Error().Err(err).Msg("Invalid port in multiaddr") + continue + } - port, _ := strconv.Atoi(portStr) + port, err := strconv.Atoi(portStr) + if err != nil { + c.log.Error().Err(err).Msg("Invalid port number") + continue + } - isValidator := 1 + isValidator := true longLived := indexesFromBitfield(event.MetaData.Attnets) shortLived := extractShortLivedSubnets(event.SubscribedSubnets, longLived) // If there are no short lived subnets, then the peer is not a validator if len(shortLived) == 0 { - isValidator = 0 + isValidator = false } prevCache, found := c.validatorCache[event.ID] @@ -412,17 +428,17 @@ func (c *Consumer) HandleValidatorMetadataEvent() error { Port: uint16(port), LastSeen: uint64(event.Timestamp), LastEpoch: uint64(event.Epoch), - PossibleValidator: uint8(isValidator), + PossibleValidator: isValidator, AverageValidatorCount: currAvgValidatorCount, NumObservations: prevNumObservations + 1, } - c.log.Info().Any("validator_metadata", validatorMetadata).Msg("Inserted validator metadata") c.validatorCache[event.ID] = &validatorMetadata // Write to Clickhouse if c.chClient != nil { c.chClient.ValidatorEventChan <- &validatorMetadata + c.log.Trace().Any("validator_metadata", validatorMetadata).Msg("Inserted validator metadata") } default: c.log.Debug().Msg("No validator metadata event")