Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong sync committee participation association #2682

Merged
merged 12 commits into from
Nov 27, 2023
266 changes: 265 additions & 1 deletion cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/coocood/freecache"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -52,7 +54,7 @@ var opts = struct {

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand Down Expand Up @@ -359,6 +361,10 @@ func main() {
}
case "export-stats-totals":
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
case "export-sync-committee-periods":
exportSyncCommitteePeriods(rpcClient, opts.StartDay, opts.EndDay, opts.DryRun)
case "export-sync-committee-validator-stats":
exportSyncCommitteeValidatorStats(rpcClient, opts.StartDay, opts.EndDay, opts.DryRun, true)
case "fix-exec-transactions-count":
err = fixExecTransactionsCount()
default:
Expand Down Expand Up @@ -1233,3 +1239,261 @@ OUTER:

logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start))
}

/*
Instead of deleting entries from the sync_committee table in a prod environment and wait for the exporter to sync back all entries,
this method will replace each sync committee period one by one with the new one. Which is much nicer for a prod environment.
*/
func exportSyncCommitteePeriods(rpcClient rpc.Client, startDay, endDay uint64, dryRun bool) {
var lastEpoch = uint64(0)

firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch)
if startDay > 0 {
firstEpoch, _ := utils.GetFirstAndLastEpochForDay(startDay)
firstPeriod = utils.SyncPeriodOfEpoch(firstEpoch)
}

if endDay <= 0 {
var err error
lastEpoch, err = db.GetLatestFinalizedEpoch()
if err != nil {
utils.LogError(err, "error getting latest finalized epoch", 0)
return
}
if lastEpoch > 0 { // guard against underflows
lastEpoch = lastEpoch - 1
}
} else {
_, lastEpoch = utils.GetFirstAndLastEpochForDay(endDay)
}

lastPeriod := utils.SyncPeriodOfEpoch(uint64(lastEpoch)) + 1 // we can look into the future

start := time.Now()
for p := firstPeriod; p <= lastPeriod; p++ {
t0 := time.Now()

err := reExportSyncCommittee(rpcClient, p, dryRun)
if err != nil {
if strings.Contains(err.Error(), "not found 404") {
logrus.WithField("period", p).Infof("reached max period, stopping")
break
} else {
utils.LogError(err, "error re-exporting sync_committee", 0, map[string]interface{}{
"period": p,
})
return
}
}

logrus.WithFields(logrus.Fields{
"period": p,
"epoch": utils.FirstEpochOfSyncPeriod(p),
"duration": time.Since(t0),
}).Infof("re-exported sync_committee")
}

logrus.Infof("finished all exporting sync_committee for periods %v - %v, took %v", firstPeriod, lastPeriod, time.Since(start))
}

func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, startDay, endDay uint64, dryRun, skipPhase1 bool) {
if endDay <= 0 {
lastEpoch, err := db.GetLatestFinalizedEpoch()
if err != nil {
utils.LogError(err, "error getting latest finalized epoch", 0)
return
}
if lastEpoch > 0 { // guard against underflows
lastEpoch = lastEpoch - 1
}

_, err = db.GetLastExportedStatisticDay()
if err != nil {
logrus.Infof("skipping exporting stats, first day has not been indexed yet")
return
}

epochsPerDay := utils.EpochsPerDay()
currentDay := lastEpoch / epochsPerDay
endDay = currentDay - 1 // current day will be picked up by exporter
}

start := time.Now()

for day := startDay; day <= endDay; day++ {
startDay := time.Now()
err := UpdateValidatorStatisticsSyncData(day, rpcClient, dryRun)
if err != nil {
utils.LogError(err, fmt.Errorf("error exporting stats for day %v", day), 0)
break
}

logrus.Infof("finished updating validators_stats for day %v, took %v", day, time.Since(startDay))
}

logrus.Infof("finished all exporting stats for days %v - %v, took %v", startDay, endDay, time.Since(start))
logrus.Infof("REMEMBER: To execute export-stats-totals now to update the totals")
}

func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun bool) error {
exportStart := time.Now()
firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day)

logrus.Infof("exporting statistics for day %v (epoch %v to %v)", day, firstEpoch, lastEpoch)

if err := db.CheckIfDayIsFinalized(day); err != nil && !dryRun {
return err
}

logrus.Infof("getting exported state for day %v", day)

var err error
var maxValidatorIndex uint64
err = db.ReaderDb.Get(&maxValidatorIndex, `SELECT MAX(validatorindex) FROM validator_stats WHERE day = $1`, day)
if err != nil {
utils.LogFatal(err, "error: could not get max validator index", 0, map[string]interface{}{
"epoch": firstEpoch,
})
} else if maxValidatorIndex == uint64(0) {
utils.LogFatal(err, "error: no validator found", 0, map[string]interface{}{
"epoch": firstEpoch,
})
}
maxValidatorIndex += 10000 // add some buffer, exact number is not important. Should just be bigger than max validators that can join in a day

validatorData := make([]*types.ValidatorStatsTableDbRow, 0, maxValidatorIndex)
validatorDataMux := &sync.Mutex{}

logrus.Infof("processing statistics for validators 0-%d", maxValidatorIndex)
for i := uint64(0); i <= maxValidatorIndex; i++ {
validatorData = append(validatorData, &types.ValidatorStatsTableDbRow{
ValidatorIndex: i,
Day: int64(day),
})
}

g := &errgroup.Group{}

g.Go(func() error {
if err := db.GatherValidatorSyncDutiesForDay(nil, day, validatorData, validatorDataMux); err != nil {
return fmt.Errorf("error in GatherValidatorSyncDutiesForDay: %w", err)
}
return nil
})

err = g.Wait()
if err != nil {
return err
}

onlySyncCommitteeValidatorData := make([]*types.ValidatorStatsTableDbRow, 0, len(validatorData))
for index := range validatorData {

if validatorData[index].ParticipatedSync > 0 || validatorData[index].MissedSync > 0 || validatorData[index].OrphanedSync > 0 {
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
onlySyncCommitteeValidatorData = append(onlySyncCommitteeValidatorData, validatorData[index])
}
}

if len(onlySyncCommitteeValidatorData) == 0 {
return nil // no sync committee yet skip
}

logrus.Infof("statistics data collection for day %v completed", day)

var statisticsDataToday []*types.ValidatorStatsTableDbRow
if dryRun {
var err error
statisticsDataToday, err = db.GatherStatisticsForDay(int64(day)) // convert to int64 to avoid underflows
if err != nil {
return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err)
}
}

tx, err := db.WriterDb.Beginx()
if err != nil {
return fmt.Errorf("error retrieving raw sql connection: %w", err)
}
defer tx.Rollback()

logrus.Infof("updating statistics data into the validator_stats table %v | %v", len(onlySyncCommitteeValidatorData), len(validatorData))

for _, data := range onlySyncCommitteeValidatorData {
if dryRun {
logrus.Infof(
"validator %v: participated sync: %v -> %v, missed sync: %v -> %v, orphaned sync: %v -> %v",
data.ValidatorIndex, statisticsDataToday[data.ValidatorIndex].ParticipatedSync, data.ParticipatedSync, statisticsDataToday[data.ValidatorIndex].MissedSync, data.MissedSync, statisticsDataToday[data.ValidatorIndex].OrphanedSync,
data.OrphanedSync,
)
} else {
tx.Exec(`
UPDATE validator_stats set
participated_sync = $1,
missed_sync = $2,
orphaned_sync = $3,
WHERE day = $4 AND validatorindex = $5`,
data.ParticipatedSync,
data.MissedSync,
data.OrphanedSync,
data.Day, data.ValidatorIndex)
}
}

if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("error during statistics data insert: %w", err)
}

logrus.Infof("statistics sync re-export of day %v completed, took %v", day, time.Since(exportStart))
return nil
}

func reExportSyncCommittee(rpcClient rpc.Client, p uint64, dryRun bool) error {
if dryRun {
var currentData []struct {
ValidatorIndex uint64 `db:"validatorindex"`
CommitteeIndex uint64 `db:"committeeindex"`
}

err := db.WriterDb.Select(&currentData, `SELECT validatorindex, committeeindex FROM sync_committees WHERE period = $1`, p)
if err != nil {
return errors.Wrap(err, "select old entries")
}

newData, err := exporter.GetSyncCommitteAtPeriod(rpcClient, p)
if err != nil {
return errors.Wrap(err, "export")
}

// now we compare currentData with newData and print any difference in committeeindex
for _, d := range currentData {
for _, n := range newData {
if d.ValidatorIndex == n.ValidatorIndex && d.CommitteeIndex != n.CommitteeIndex {
logrus.Infof("validator %v has different committeeindex: %v -> %v", d.ValidatorIndex, d.CommitteeIndex, n.CommitteeIndex)
}
}
}
return nil
} else {
tx, err := db.WriterDb.Beginx()
if err != nil {
return errors.Wrap(err, "tx")
}

defer tx.Rollback()
_, err = tx.Exec(`DELETE FROM sync_committees WHERE period = $1`, p)
if err != nil {
return errors.Wrap(err, "delete old entries")
}

err = exporter.ExportSyncCommitteeAtPeriod(rpcClient, p, tx)
if err != nil {
return errors.Wrap(err, "export")
}

return tx.Commit()
}
}
26 changes: 14 additions & 12 deletions db/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {

logger.Infof("exporting statistics for day %v (epoch %v to %v)", day, firstEpoch, lastEpoch)

if err := checkIfDayIsFinalized(day); err != nil {
if err := CheckIfDayIsFinalized(day); err != nil {
return err
}

Expand Down Expand Up @@ -105,7 +105,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
return nil
})
g.Go(func() error {
if err := gatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil {
if err := GatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil {
return fmt.Errorf("error in GatherValidatorSyncDutiesForDay: %w", err)
}
return nil
Expand Down Expand Up @@ -138,7 +138,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
var statisticsData1d []*types.ValidatorStatsTableDbRow
g.Go(func() error {
var err error
statisticsData1d, err = gatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows
statisticsData1d, err = GatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows
if err != nil {
return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err)
}
Expand All @@ -147,7 +147,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
var statisticsData7d []*types.ValidatorStatsTableDbRow
g.Go(func() error {
var err error
statisticsData7d, err = gatherStatisticsForDay(int64(day) - 7) // convert to int64 to avoid underflows
statisticsData7d, err = GatherStatisticsForDay(int64(day) - 7) // convert to int64 to avoid underflows
if err != nil {
return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err)
}
Expand All @@ -156,7 +156,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
var statisticsData31d []*types.ValidatorStatsTableDbRow
g.Go(func() error {
var err error
statisticsData31d, err = gatherStatisticsForDay(int64(day) - 31) // convert to int64 to avoid underflows
statisticsData31d, err = GatherStatisticsForDay(int64(day) - 31) // convert to int64 to avoid underflows
if err != nil {
return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err)
}
Expand All @@ -165,7 +165,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
var statisticsData365d []*types.ValidatorStatsTableDbRow
g.Go(func() error {
var err error
statisticsData365d, err = gatherStatisticsForDay(int64(day) - 365) // convert to int64 to avoid underflows
statisticsData365d, err = GatherStatisticsForDay(int64(day) - 365) // convert to int64 to avoid underflows
if err != nil {
return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err)
}
Expand Down Expand Up @@ -884,7 +884,7 @@ func gatherValidatorDepositWithdrawals(day uint64, data []*types.ValidatorStatsT
return nil
}

func gatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*types.ValidatorStatsTableDbRow, mux *sync.Mutex) error {
func GatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*types.ValidatorStatsTableDbRow, mux *sync.Mutex) error {
exportStart := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues("db_update_validator_sync_stats").Observe(time.Since(exportStart).Seconds())
Expand All @@ -898,9 +898,11 @@ func gatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*ty
return nil
}
logger := logger.WithFields(logrus.Fields{
"day": day,
"firstEpoch": firstEpoch,
"lastEpoch": lastEpoch,
"day": day,
"firstEpoch": firstEpoch,
"lastEpoch": lastEpoch,
"startPeriod": utils.SyncPeriodOfEpoch(firstEpoch),
"endPeriod": utils.SyncPeriodOfEpoch(lastEpoch),
})
logger.Infof("gathering sync duties")

Expand Down Expand Up @@ -1105,7 +1107,7 @@ func gatherValidatorMissedAttestationsStatisticsForDay(validators []uint64, day
return nil
}

func gatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error) {
func GatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error) {

if day < 0 {
return nil, nil
Expand Down Expand Up @@ -1790,7 +1792,7 @@ func WriteGraffitiStatisticsForDay(day int64) error {
return nil
}

func checkIfDayIsFinalized(day uint64) error {
func CheckIfDayIsFinalized(day uint64) error {
_, lastEpoch := utils.GetFirstAndLastEpochForDay(day)

latestFinalizedEpoch, err := GetLatestFinalizedEpoch()
Expand Down
Loading
Loading