Skip to content

Commit

Permalink
save all to clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
hdser committed Jul 13, 2024
1 parent b722e0f commit 15b47d3
Show file tree
Hide file tree
Showing 6 changed files with 910 additions and 52,534 deletions.
210 changes: 153 additions & 57 deletions clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,67 @@ func ValidatorMetadataDDL(db string) string {
PRIMARY KEY (id, timestamp)`, db)
}

func IPMetadataDDL(db string) string {
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.ip_metadata (
ip String,
hostname String,
city String,
region String,
country String,
latitude Float64,
longitude Float64,
postal_code String,
asn String,
asn_organization String,
asn_type String
) ENGINE = MergeTree()
ORDER BY ip;
`, db)
}

func PeerDiscoveredEventsDDL(db string) string {
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.peer_discovered_events (
enr String,
id String,
ip String,
port Int32,
crawler_id String,
crawler_location String,
timestamp DateTime64(3, 'UTC')
) ENGINE = MergeTree()
ORDER BY (timestamp, id);
`, db)
}

func MetadataReceivedEventsDDL(db string) string {
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.metadata_received_events (
enr String,
id String,
multiaddr String,
epoch Int32,
subscribed_subnets Array(Int64),
client_version String,
crawler_id String,
crawler_location String,
timestamp DateTime64(3, 'UTC')
) ENGINE = MergeTree()
ORDER BY (timestamp, id);
`, db)
}

type ClickhouseConfig struct {
Endpoint string
DB string
Username string
Password string

MaxValidatorBatchSize uint64
MaxValidatorBatchSize uint64
MaxIPMetadataBatchSize uint64
MaxPeerDiscoveredEventsBatchSize uint64
MaxMetadataReceivedEventsBatchSize uint64
}

type ClickhouseClient struct {
Expand All @@ -47,9 +101,13 @@ type ClickhouseClient struct {

chConn driver.Conn

ValidatorEventChan chan *types.ValidatorEvent
ValidatorEventChan chan *types.ValidatorEvent
IPMetadataEventChan chan *types.IPMetadataEvent
PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent
MetadataReceivedEventChan chan *types.MetadataReceivedEvent
}


func NewClickhouseClient(cfg *ClickhouseConfig) (*ClickhouseClient, error) {
log := log.NewLogger("clickhouse")

Expand Down Expand Up @@ -80,81 +138,119 @@ func NewClickhouseClient(cfg *ClickhouseConfig) (*ClickhouseClient, error) {
chConn: conn,

ValidatorEventChan: make(chan *types.ValidatorEvent, 16384),
IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384),
PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384),
MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384),
}, nil
}

func (c *ClickhouseClient) initializeTables() error {
// Create validator_metadata table
if err := c.chConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating validator_metadata table")
return err
}

// Create ip_metadata table
if err := c.chConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating ip_metadata table")
return err
}


// Create peer_discovered_events table
if err := c.chConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating peer_discovered_events table")
return err
}

// Create metadata_received_events table
if err := c.chConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating metadata_received_events table")
return err
}

return nil
}

func (c *ClickhouseClient) Start() error {
c.log.Info().Str("endpoint", c.cfg.Endpoint).Msg("Setting up Clickhouse database")
if err := c.chConn.Exec(context.Background(), fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating database")
return err
}
c.log.Info().Str("db", c.cfg.DB).Msg("Database created")
c.log.Info().
Str("endpoint", c.cfg.Endpoint).
Uint64("MaxValidatorBatchSize", c.cfg.MaxValidatorBatchSize).
Uint64("MaxIPMetadataBatchSize", c.cfg.MaxIPMetadataBatchSize).
Uint64("MaxPeerDiscoveredEventsBatchSize", c.cfg.MaxPeerDiscoveredEventsBatchSize).
Uint64("MaxMetadataReceivedEventsBatchSize", c.cfg.MaxMetadataReceivedEventsBatchSize).
Msg("Setting up ClickHouse database")

if err := c.chConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil {
c.log.Error().Err(err).Msg("creating validator_metadata table")
return err
}
if err := c.initializeTables(); err != nil {
return err
}

go c.validatorEventBatcher()
go batchProcessor(c, "validator_metadata", c.ValidatorEventChan, c.cfg.MaxValidatorBatchSize)
go batchProcessor(c, "ip_metadata", c.IPMetadataEventChan, c.cfg.MaxIPMetadataBatchSize)
go batchProcessor(c, "peer_discovered_events", c.PeerDiscoveredEventChan, c.cfg.MaxPeerDiscoveredEventsBatchSize)
go batchProcessor(c, "metadata_received_events", c.MetadataReceivedEventChan, c.cfg.MaxMetadataReceivedEventsBatchSize)

return nil
return nil
}

func (c *ClickhouseClient) validatorEventBatcher() {
var (
err error
batch driver.Batch
)

count := uint64(0)

for {
batch, err = c.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s.validator_metadata", c.cfg.DB))
if err != nil {
c.log.Error().Err(err).Msg("preparing validator_metadata batch failed, retrying...")
} else {
break
}
// BatchProcessor processes events in batches for a specified table in ClickHouse.
func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan <-chan T, maxSize uint64) {
// Prepare the initial batch.
batch, err := client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
if err != nil {
client.log.Error().Err(err).Msg("Failed to prepare batch")
return
}

for row := range c.ValidatorEventChan {
if err := batch.AppendStruct(row); err != nil {
c.log.Error().Err(err).Msg("appending struct to validator_metadata batch")
var count uint64 = 0

// Process events from the channel.
for event := range eventChan {
client.log.Debug().Interface("event", event).Msg("Event received")

if err := batch.AppendStruct(event); err != nil {
client.log.Error().Err(err).Msg("Failed to append event to batch")
continue
}

count++
c.log.Debug().Uint64("count", count).Msg("appended struct to validator_metadata batch")

if count >= c.cfg.MaxValidatorBatchSize {
// Reset counter
count = 0

start := time.Now()
// Infinite retries for now
for {
if batch.IsSent() {
break
}

if err := batch.Send(); err != nil {
c.log.Error().Err(err).Msg("sending validator_metadata batch failed, retrying...")
} else {
break
}
// Check if the batch size has reached the maximum size.
if count >= maxSize {
client.log.Info().Uint64("batch_size", count).Msg("Max batch size reached, sending batch")
if err := sendBatch(client, batch); err != nil {
client.log.Error().Err(err).Msg("Failed to send batch, will retry")
continue
}

c.log.Info().Str("took", time.Since(start).String()).Int("channel_len", len(c.ValidatorEventChan)).Msg("Inserted validator_metadata batch")

// Reset batch
for {
batch, err = c.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s.validator_metadata", c.cfg.DB))
if err != nil {
c.log.Error().Err(err).Msg("preparing validator_metadata batch (reset) failed, retrying")
} else {
break
}
// Prepare a new batch after sending the current batch.
batch, err = client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName))
if err != nil {
client.log.Error().Err(err).Msg("Failed to prepare new batch after sending")
return
}
count = 0
}
}

// Handle any remaining events in the final batch when the channel closes.
if count > 0 {
client.log.Info().Uint64("batch_size", count).Msg("Sending final batch")
if err := sendBatch(client, batch); err != nil {
client.log.Error().Err(err).Msg("Failed to send final batch")
}
}
}

// sendBatch sends the batch to ClickHouse and logs the operation.
func sendBatch(client *ClickhouseClient, batch driver.Batch) error {
if err := batch.Send(); err != nil {
client.log.Error().Err(err).Msg("Failed to send batch")
return err
}
client.log.Info().Msg("Batch sent successfully")
return nil
}
32 changes: 25 additions & 7 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,24 @@ var ConsumerCommand = &cli.Command{
Usage: "Dune API key",
},
&cli.Uint64Flag{
Name: "batch-size",
Name: "max-validator-batch-size",
Usage: "Clickhouse max validator batch size",
Value: 128,
Value: 16,
},
&cli.Uint64Flag{
Name: "max-ip-metadata-batch-size",
Usage: "Clickhouse max IP metadata batch size",
Value: 16,
},
&cli.Uint64Flag{
Name: "max-peer-discovered-events-batch-size",
Usage: "Clickhouse max peer discovered events batch size",
Value: 512,
},
&cli.Uint64Flag{
Name: "max-metadata-received-events-batch-size",
Usage: "Clickhouse max metadata received events batch size",
Value: 512,
},
},
}
Expand Down Expand Up @@ -101,11 +116,14 @@ func runConsumer(c *cli.Context) error {
DuneNamespace: c.String("dune.namespace"),
DuneApiKey: c.String("dune.api-key"),
ChCfg: clickhouse.ClickhouseConfig{
Endpoint: c.String("endpoint"),
DB: c.String("db"),
Username: c.String("username"),
Password: c.String("password"),
MaxValidatorBatchSize: c.Uint64("batch-size"),
Endpoint: c.String("endpoint"),
DB: c.String("db"),
Username: c.String("username"),
Password: c.String("password"),
MaxValidatorBatchSize: c.Uint64("max-validator-batch-size"),
MaxIPMetadataBatchSize: c.Uint64("max-ip-metadata-batch-size"),
MaxPeerDiscoveredEventsBatchSize: c.Uint64("max-peer-discovered-events-batch-size"),
MaxMetadataReceivedEventsBatchSize: c.Uint64("max-metadata-received-events-batch-size"),
},
}

Expand Down
Loading

0 comments on commit 15b47d3

Please sign in to comment.