Skip to content

Commit

Permalink
(BIDS-2670) batch machine metric inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbitfly committed Nov 12, 2023
1 parent 6ff245c commit 3b8030f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 20 deletions.
95 changes: 75 additions & 20 deletions db/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type Bigtable struct {
chainId string

v2SchemaCutOffEpoch uint64

machineMetricsQueuedWritesChan chan (types.BulkMutation)
machineMetricsQueuedWritesMux *sync.RWMutex

Check failure on line 78 in db/bigtable.go

View workflow job for this annotation

GitHub Actions / Run CI (ubuntu-latest, 1.20.x)

field machineMetricsQueuedWritesMux is unused (U1000)
}

func InitBigtable(project, instance, chainId, redisAddress string) (*Bigtable, error) {
Expand Down Expand Up @@ -110,26 +113,80 @@ func InitBigtable(project, instance, chainId, redisAddress string) (*Bigtable, e
}

bt := &Bigtable{
client: btClient,
tableData: btClient.Open("data"),
tableBlocks: btClient.Open("blocks"),
tableMetadataUpdates: btClient.Open("metadata_updates"),
tableMetadata: btClient.Open("metadata"),
tableBeaconchain: btClient.Open("beaconchain"),
tableMachineMetrics: btClient.Open("machine_metrics"),
tableValidators: btClient.Open("beaconchain_validators"),
tableValidatorsHistory: btClient.Open("beaconchain_validators_history"),
chainId: chainId,
redisCache: rdc,
lastAttestationCacheMux: &sync.Mutex{},
v2SchemaCutOffEpoch: utils.Config.Bigtable.V2SchemaCutOffEpoch,
client: btClient,
tableData: btClient.Open("data"),
tableBlocks: btClient.Open("blocks"),
tableMetadataUpdates: btClient.Open("metadata_updates"),
tableMetadata: btClient.Open("metadata"),
tableBeaconchain: btClient.Open("beaconchain"),
tableMachineMetrics: btClient.Open("machine_metrics"),
tableValidators: btClient.Open("beaconchain_validators"),
tableValidatorsHistory: btClient.Open("beaconchain_validators_history"),
chainId: chainId,
redisCache: rdc,
lastAttestationCacheMux: &sync.Mutex{},
v2SchemaCutOffEpoch: utils.Config.Bigtable.V2SchemaCutOffEpoch,
machineMetricsQueuedWritesChan: make(chan types.BulkMutation, MAX_BATCH_MUTATIONS),
}

BigtableClient = bt
return bt, nil
}

func (bigtable *Bigtable) commitQueuedMachineMetricWrites() {

Check failure on line 136 in db/bigtable.go

View workflow job for this annotation

GitHub Actions / Run CI (ubuntu-latest, 1.20.x)

func (*Bigtable).commitQueuedMachineMetricWrites is unused (U1000)

// copy the pending mutations over and commit them

batchSize := 10000

muts := &types.BulkMutations{
Keys: make([]string, 0, batchSize),
Muts: make([]*gcp_bigtable.Mutation, 0, batchSize),
}

tmr := time.NewTicker(time.Second * 10)
for {
select {
case mut, ok := <-bigtable.machineMetricsQueuedWritesChan:

if ok {
muts.Keys = append(muts.Keys, mut.Key)
muts.Muts = append(muts.Muts, mut.Mut)
}

if len(muts.Keys) >= batchSize || !ok { // commit when batch size is reached or on channel close
err := bigtable.WriteBulk(muts, bigtable.tableMachineMetrics)

if err == nil {
muts = &types.BulkMutations{
Keys: make([]string, 0, batchSize),
Muts: make([]*gcp_bigtable.Mutation, 0, batchSize),
}
} else {
logger.Errorf("error writing queued machine metrics to bigtable: %v", err)
}
}

case <-tmr.C:
if len(muts.Keys) > 0 {
err := bigtable.WriteBulk(muts, bigtable.tableMachineMetrics)

if err == nil {
muts = &types.BulkMutations{
Keys: make([]string, 0, batchSize),
Muts: make([]*gcp_bigtable.Mutation, 0, batchSize),
}
} else {
logger.Errorf("error writing queued machine metrics to bigtable: %v", err)
}
}
}
}

}

func (bigtable *Bigtable) Close() {
close(bigtable.machineMetricsQueuedWritesChan)
bigtable.client.Close()
}

Expand Down Expand Up @@ -167,15 +224,13 @@ func (bigtable *Bigtable) SaveMachineMetric(process string, userID uint64, machi
dataMut := gcp_bigtable.NewMutation()
dataMut.Set(MACHINE_METRICS_COLUMN_FAMILY, "v1", ts, data)

err = bigtable.tableMachineMetrics.Apply(
ctx,
rowKeyData,
dataMut,
)
if err != nil {
return err
bulkMut := types.BulkMutation{ // schedule the mutation for writing
Key: rowKeyData,
Mut: dataMut,
}

bigtable.machineMetricsQueuedWritesChan <- bulkMut

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions types/eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ type BulkMutations struct {
Keys []string
Muts []*gcp_bigtable.Mutation
}
type BulkMutation struct {
Key string
Mut *gcp_bigtable.Mutation
}

0 comments on commit 3b8030f

Please sign in to comment.