Skip to content

Commit

Permalink
misc(db): fixes, logging, cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
namn-grg committed Jun 1, 2024
1 parent e9a10c9 commit 6c8da92
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.valtrack
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ EXPOSE 9000
FROM debian:bookworm

COPY --from=builder /run-app /usr/local/bin/
CMD ["run-app"]
CMD ["run-app", "sentry"]
6 changes: 3 additions & 3 deletions clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func ValidatorMetadataDDL(db string) string {
port UInt16,
last_seen UInt64,
last_epoch UInt64,
possible_validator UInt8,
possible_validator Bool,
average_validator_count Int32,
num_observations UInt64
) ENGINE = MergeTree()
Expand All @@ -36,7 +36,7 @@ type ValidatorMetadataEvent struct {
Port uint16 `ch:"port"`
LastSeen uint64 `ch:"last_seen"`
LastEpoch uint64 `ch:"last_epoch"`
PossibleValidator uint8 `ch:"possible_validator"` // Using uint as bool
PossibleValidator bool `ch:"possible_validator"` // Using uint as bool
AverageValidatorCount int32 `ch:"average_validator_count"`
NumObservations uint64 `ch:"num_observations"`
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (c *ClickhouseClient) validatorEventBatcher() {
}
}

c.log.Debug().Str("took", time.Since(start).String()).Int("channel_len", len(c.ValidatorEventChan)).Msg("Inserted validator_metadata batch")
c.log.Info().Str("took", time.Since(start).String()).Int("channel_len", len(c.ValidatorEventChan)).Msg("Inserted validator_metadata batch")

// Reset batch
for {
Expand Down
12 changes: 6 additions & 6 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var ConsumerCommand = &cli.Command{
&cli.Uint64Flag{
Name: "batch-size",
Usage: "Clickhouse max validator batch size",
Value: 64,
Value: 128,
},
},
}
Expand All @@ -64,11 +64,11 @@ func LaunchConsumer(c *cli.Context) error {
NatsURL: c.String("nats-url"),
Name: c.String("name"),
ChCfg: clickhouse.ClickhouseConfig{
Endpoint: c.String("clickhouse-endpoint"),
DB: c.String("clickhouse-db"),
Username: c.String("clickhouse-username"),
Password: c.String("clickhouse-password"),
MaxValidatorBatchSize: c.Uint64("clickhouse-max-validator-batch-size"),
Endpoint: c.String("endpoint"),
DB: c.String("db"),
Username: c.String("username"),
Password: c.String("password"),
MaxValidatorBatchSize: c.Uint64("batch-size"),
},
}

Expand Down
38 changes: 27 additions & 11 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func RunConsumer(cfg *ConsumerConfig) {
Username: cfg.ChCfg.Username,
Password: cfg.ChCfg.Password,

MaxValidatorBatchSize: 10,
MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize,
}

var chClient *ch.ClickhouseClient
Expand All @@ -182,7 +182,7 @@ func RunConsumer(cfg *ConsumerConfig) {
validatorWriter: validatorWriter,
js: js,

validatorMetadataChan: make(chan *ethereum.MetadataReceivedEvent, 1000),
validatorMetadataChan: make(chan *ethereum.MetadataReceivedEvent, 16384),
validatorCache: make(map[string]*ch.ValidatorMetadataEvent),

chClient: chClient,
Expand Down Expand Up @@ -370,22 +370,38 @@ func (c *Consumer) HandleValidatorMetadataEvent() error {
for {
select {
case event := <-c.validatorMetadataChan:
c.log.Info().Any("event", event).Msg("Received validator event")
c.log.Trace().Any("event", event).Msg("Received validator event")

maddr, _ := ma.NewMultiaddr(event.Multiaddr)
maddr, err := ma.NewMultiaddr(event.Multiaddr)
if err != nil {
c.log.Error().Err(err).Msg("Invalid multiaddr")
continue
}

ip, _ := maddr.ValueForProtocol(ma.P_IP4)
ip, err := maddr.ValueForProtocol(ma.P_IP4)
if err != nil {
c.log.Error().Err(err).Msg("Invalid IP in multiaddr")
continue
}

portStr, _ := maddr.ValueForProtocol(ma.P_TCP)
portStr, err := maddr.ValueForProtocol(ma.P_TCP)
if err != nil {
c.log.Error().Err(err).Msg("Invalid port in multiaddr")
continue
}

port, _ := strconv.Atoi(portStr)
port, err := strconv.Atoi(portStr)
if err != nil {
c.log.Error().Err(err).Msg("Invalid port number")
continue
}

isValidator := 1
isValidator := true
longLived := indexesFromBitfield(event.MetaData.Attnets)
shortLived := extractShortLivedSubnets(event.SubscribedSubnets, longLived)
// If there are no short lived subnets, then the peer is not a validator
if len(shortLived) == 0 {
isValidator = 0
isValidator = false
}

prevCache, found := c.validatorCache[event.ID]
Expand All @@ -412,17 +428,17 @@ func (c *Consumer) HandleValidatorMetadataEvent() error {
Port: uint16(port),
LastSeen: uint64(event.Timestamp),
LastEpoch: uint64(event.Epoch),
PossibleValidator: uint8(isValidator),
PossibleValidator: isValidator,
AverageValidatorCount: currAvgValidatorCount,
NumObservations: prevNumObservations + 1,
}
c.log.Info().Any("validator_metadata", validatorMetadata).Msg("Inserted validator metadata")

c.validatorCache[event.ID] = &validatorMetadata

// Write to Clickhouse
if c.chClient != nil {
c.chClient.ValidatorEventChan <- &validatorMetadata
c.log.Trace().Any("validator_metadata", validatorMetadata).Msg("Inserted validator metadata")
}
default:
c.log.Debug().Msg("No validator metadata event")
Expand Down

0 comments on commit 6c8da92

Please sign in to comment.