diff --git a/cmd/misc/main.go b/cmd/misc/main.go index ac265938cc..395887c1af 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -12,14 +12,17 @@ import ( "eth2-exporter/utils" "eth2-exporter/version" "fmt" + "math" "math/big" "net/http" "strconv" "strings" + "sync" "time" "github.com/coocood/freecache" _ "github.com/jackc/pgx/v5/stdlib" + "github.com/pkg/errors" utilMath "github.com/protolambda/zrnt/eth2/util/math" "golang.org/x/sync/errgroup" @@ -52,7 +55,7 @@ var opts = struct { func main() { configPath := flag.String("config", "config/default.config.yml", "Path to the config file") - flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals") + flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats") flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch") flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch") flag.Uint64Var(&opts.User, "user", 0, "user id") @@ -359,6 +362,10 @@ func main() { } case "export-stats-totals": exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency) + case "export-sync-committee-periods": + exportSyncCommitteePeriods(rpcClient, opts.StartDay, opts.EndDay, opts.DryRun) + case "export-sync-committee-validator-stats": + exportSyncCommitteeValidatorStats(rpcClient, opts.StartDay, opts.EndDay, opts.DryRun, true) case "fix-exec-transactions-count": err = fixExecTransactionsCount() default: @@ -1121,6 +1128,12 @@ func exportHistoricPrices(dayStart uint64, dayEnd uint64) { func exportStatsTotals(columns string, dayStart, dayEnd, concurrency uint64) { start := time.Now() + exportToToday := false + if dayEnd <= 0 { + exportToToday = true + dayEnd = math.MaxInt + } + logrus.Infof("exporting stats totals for columns '%v'", columns) // validate columns input @@ -1228,8 +1241,274 @@ OUTER: "columns": columns, }) } + + if exportToToday { + dayEnd, err = db.GetLastExportedStatisticDay() + if err != nil { + utils.LogError(err, "error getting last exported statistic day", 0) + return + } + } logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay)) } logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start)) } + +/* +Instead of deleting entries from the sync_committee table in a prod environment and wait for the exporter to sync back all entries, +this method will replace each sync committee period one by one with the new one. Which is much nicer for a prod environment. +*/ +func exportSyncCommitteePeriods(rpcClient rpc.Client, startDay, endDay uint64, dryRun bool) { + var lastEpoch = uint64(0) + + firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch) + if startDay > 0 { + firstEpoch, _ := utils.GetFirstAndLastEpochForDay(startDay) + firstPeriod = utils.SyncPeriodOfEpoch(firstEpoch) + } + + if endDay <= 0 { + var err error + lastEpoch, err = db.GetLatestFinalizedEpoch() + if err != nil { + utils.LogError(err, "error getting latest finalized epoch", 0) + return + } + if lastEpoch > 0 { // guard against underflows + lastEpoch = lastEpoch - 1 + } + } else { + _, lastEpoch = utils.GetFirstAndLastEpochForDay(endDay) + } + + lastPeriod := utils.SyncPeriodOfEpoch(uint64(lastEpoch)) + 1 // we can look into the future + + start := time.Now() + for p := firstPeriod; p <= lastPeriod; p++ { + t0 := time.Now() + + err := reExportSyncCommittee(rpcClient, p, dryRun) + if err != nil { + if strings.Contains(err.Error(), "not found 404") { + logrus.WithField("period", p).Infof("reached max period, stopping") + break + } else { + utils.LogError(err, "error re-exporting sync_committee", 0, map[string]interface{}{ + "period": p, + }) + return + } + } + + logrus.WithFields(logrus.Fields{ + "period": p, + "epoch": utils.FirstEpochOfSyncPeriod(p), + "duration": time.Since(t0), + }).Infof("re-exported sync_committee") + } + + logrus.Infof("finished all exporting sync_committee for periods %v - %v, took %v", firstPeriod, lastPeriod, time.Since(start)) +} + +func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, startDay, endDay uint64, dryRun, skipPhase1 bool) { + if endDay <= 0 { + lastEpoch, err := db.GetLatestFinalizedEpoch() + if err != nil { + utils.LogError(err, "error getting latest finalized epoch", 0) + return + } + if lastEpoch > 0 { // guard against underflows + lastEpoch = lastEpoch - 1 + } + + _, err = db.GetLastExportedStatisticDay() + if err != nil { + logrus.Infof("skipping exporting stats, first day has not been indexed yet") + return + } + + epochsPerDay := utils.EpochsPerDay() + currentDay := lastEpoch / epochsPerDay + endDay = currentDay - 1 // current day will be picked up by exporter + } + + start := time.Now() + + for day := startDay; day <= endDay; day++ { + startDay := time.Now() + err := UpdateValidatorStatisticsSyncData(day, rpcClient, dryRun) + if err != nil { + utils.LogError(err, fmt.Errorf("error exporting stats for day %v", day), 0) + break + } + + logrus.Infof("finished updating validators_stats for day %v, took %v", day, time.Since(startDay)) + } + + logrus.Infof("finished all exporting stats for days %v - %v, took %v", startDay, endDay, time.Since(start)) + logrus.Infof("REMEMBER: To execute export-stats-totals now to update the totals") +} + +func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun bool) error { + exportStart := time.Now() + firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) + + logrus.Infof("exporting statistics for day %v (epoch %v to %v)", day, firstEpoch, lastEpoch) + + if err := db.CheckIfDayIsFinalized(day); err != nil && !dryRun { + return err + } + + logrus.Infof("getting exported state for day %v", day) + + var err error + var maxValidatorIndex uint64 + err = db.ReaderDb.Get(&maxValidatorIndex, `SELECT MAX(validatorindex) FROM validator_stats WHERE day = $1`, day) + if err != nil { + utils.LogFatal(err, "error: could not get max validator index", 0, map[string]interface{}{ + "epoch": firstEpoch, + }) + } else if maxValidatorIndex == uint64(0) { + utils.LogFatal(err, "error: no validator found", 0, map[string]interface{}{ + "epoch": firstEpoch, + }) + } + maxValidatorIndex += 10000 // add some buffer, exact number is not important. Should just be bigger than max validators that can join in a day + + validatorData := make([]*types.ValidatorStatsTableDbRow, 0, maxValidatorIndex) + validatorDataMux := &sync.Mutex{} + + logrus.Infof("processing statistics for validators 0-%d", maxValidatorIndex) + for i := uint64(0); i <= maxValidatorIndex; i++ { + validatorData = append(validatorData, &types.ValidatorStatsTableDbRow{ + ValidatorIndex: i, + Day: int64(day), + }) + } + + g := &errgroup.Group{} + + g.Go(func() error { + if err := db.GatherValidatorSyncDutiesForDay(nil, day, validatorData, validatorDataMux); err != nil { + return fmt.Errorf("error in GatherValidatorSyncDutiesForDay: %w", err) + } + return nil + }) + + err = g.Wait() + if err != nil { + return err + } + + onlySyncCommitteeValidatorData := make([]*types.ValidatorStatsTableDbRow, 0, len(validatorData)) + for index := range validatorData { + + if validatorData[index].ParticipatedSync > 0 || validatorData[index].MissedSync > 0 || validatorData[index].OrphanedSync > 0 { + onlySyncCommitteeValidatorData = append(onlySyncCommitteeValidatorData, validatorData[index]) + } + } + + if len(onlySyncCommitteeValidatorData) == 0 { + return nil // no sync committee yet skip + } + + logrus.Infof("statistics data collection for day %v completed", day) + + var statisticsDataToday []*types.ValidatorStatsTableDbRow + if dryRun { + var err error + statisticsDataToday, err = db.GatherStatisticsForDay(int64(day)) // convert to int64 to avoid underflows + if err != nil { + return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) + } + } + + tx, err := db.WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error retrieving raw sql connection: %w", err) + } + defer tx.Rollback() + + logrus.Infof("updating statistics data into the validator_stats table %v | %v", len(onlySyncCommitteeValidatorData), len(validatorData)) + + for _, data := range onlySyncCommitteeValidatorData { + if dryRun { + logrus.Infof( + "validator %v: participated sync: %v -> %v, missed sync: %v -> %v, orphaned sync: %v -> %v", + data.ValidatorIndex, statisticsDataToday[data.ValidatorIndex].ParticipatedSync, data.ParticipatedSync, statisticsDataToday[data.ValidatorIndex].MissedSync, data.MissedSync, statisticsDataToday[data.ValidatorIndex].OrphanedSync, + data.OrphanedSync, + ) + } else { + tx.Exec(` + UPDATE validator_stats set + participated_sync = $1, + missed_sync = $2, + orphaned_sync = $3, + WHERE day = $4 AND validatorindex = $5`, + data.ParticipatedSync, + data.MissedSync, + data.OrphanedSync, + data.Day, data.ValidatorIndex) + } + } + + if err != nil { + return err + } + + err = tx.Commit() + if err != nil { + return fmt.Errorf("error during statistics data insert: %w", err) + } + + logrus.Infof("statistics sync re-export of day %v completed, took %v", day, time.Since(exportStart)) + return nil +} + +func reExportSyncCommittee(rpcClient rpc.Client, p uint64, dryRun bool) error { + if dryRun { + var currentData []struct { + ValidatorIndex uint64 `db:"validatorindex"` + CommitteeIndex uint64 `db:"committeeindex"` + } + + err := db.WriterDb.Select(¤tData, `SELECT validatorindex, committeeindex FROM sync_committees WHERE period = $1`, p) + if err != nil { + return errors.Wrap(err, "select old entries") + } + + newData, err := exporter.GetSyncCommitteAtPeriod(rpcClient, p) + if err != nil { + return errors.Wrap(err, "export") + } + + // now we compare currentData with newData and print any difference in committeeindex + for _, d := range currentData { + for _, n := range newData { + if d.ValidatorIndex == n.ValidatorIndex && d.CommitteeIndex != n.CommitteeIndex { + logrus.Infof("validator %v has different committeeindex: %v -> %v", d.ValidatorIndex, d.CommitteeIndex, n.CommitteeIndex) + } + } + } + return nil + } else { + tx, err := db.WriterDb.Beginx() + if err != nil { + return errors.Wrap(err, "tx") + } + + defer tx.Rollback() + _, err = tx.Exec(`DELETE FROM sync_committees WHERE period = $1`, p) + if err != nil { + return errors.Wrap(err, "delete old entries") + } + + err = exporter.ExportSyncCommitteeAtPeriod(rpcClient, p, tx) + if err != nil { + return errors.Wrap(err, "export") + } + + return tx.Commit() + } +} diff --git a/db/statistics.go b/db/statistics.go index aa68381bd3..e3e48b7c2a 100644 --- a/db/statistics.go +++ b/db/statistics.go @@ -36,7 +36,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { logger.Infof("exporting statistics for day %v (epoch %v to %v)", day, firstEpoch, lastEpoch) - if err := checkIfDayIsFinalized(day); err != nil { + if err := CheckIfDayIsFinalized(day); err != nil { return err } @@ -105,7 +105,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { return nil }) g.Go(func() error { - if err := gatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil { + if err := GatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil { return fmt.Errorf("error in GatherValidatorSyncDutiesForDay: %w", err) } return nil @@ -138,7 +138,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { var statisticsData1d []*types.ValidatorStatsTableDbRow g.Go(func() error { var err error - statisticsData1d, err = gatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows + statisticsData1d, err = GatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows if err != nil { return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) } @@ -147,7 +147,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { var statisticsData7d []*types.ValidatorStatsTableDbRow g.Go(func() error { var err error - statisticsData7d, err = gatherStatisticsForDay(int64(day) - 7) // convert to int64 to avoid underflows + statisticsData7d, err = GatherStatisticsForDay(int64(day) - 7) // convert to int64 to avoid underflows if err != nil { return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) } @@ -156,7 +156,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { var statisticsData31d []*types.ValidatorStatsTableDbRow g.Go(func() error { var err error - statisticsData31d, err = gatherStatisticsForDay(int64(day) - 31) // convert to int64 to avoid underflows + statisticsData31d, err = GatherStatisticsForDay(int64(day) - 31) // convert to int64 to avoid underflows if err != nil { return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) } @@ -165,7 +165,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { var statisticsData365d []*types.ValidatorStatsTableDbRow g.Go(func() error { var err error - statisticsData365d, err = gatherStatisticsForDay(int64(day) - 365) // convert to int64 to avoid underflows + statisticsData365d, err = GatherStatisticsForDay(int64(day) - 365) // convert to int64 to avoid underflows if err != nil { return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) } @@ -884,7 +884,7 @@ func gatherValidatorDepositWithdrawals(day uint64, data []*types.ValidatorStatsT return nil } -func gatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*types.ValidatorStatsTableDbRow, mux *sync.Mutex) error { +func GatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*types.ValidatorStatsTableDbRow, mux *sync.Mutex) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_sync_stats").Observe(time.Since(exportStart).Seconds()) @@ -898,9 +898,11 @@ func gatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*ty return nil } logger := logger.WithFields(logrus.Fields{ - "day": day, - "firstEpoch": firstEpoch, - "lastEpoch": lastEpoch, + "day": day, + "firstEpoch": firstEpoch, + "lastEpoch": lastEpoch, + "startPeriod": utils.SyncPeriodOfEpoch(firstEpoch), + "endPeriod": utils.SyncPeriodOfEpoch(lastEpoch), }) logger.Infof("gathering sync duties") @@ -1105,7 +1107,7 @@ func gatherValidatorMissedAttestationsStatisticsForDay(validators []uint64, day return nil } -func gatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error) { +func GatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error) { if day < 0 { return nil, nil @@ -1790,7 +1792,7 @@ func WriteGraffitiStatisticsForDay(day int64) error { return nil } -func checkIfDayIsFinalized(day uint64) error { +func CheckIfDayIsFinalized(day uint64) error { _, lastEpoch := utils.GetFirstAndLastEpochForDay(day) latestFinalizedEpoch, err := GetLatestFinalizedEpoch() diff --git a/exporter/sync_committees.go b/exporter/sync_committees.go index ca140a8263..3f04286db5 100644 --- a/exporter/sync_committees.go +++ b/exporter/sync_committees.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) @@ -44,7 +45,7 @@ func exportSyncCommittees(rpcClient rpc.Client) error { _, exists := dbPeriodsMap[p] if !exists { t0 := time.Now() - err = exportSyncCommitteeAtPeriod(rpcClient, p) + err = ExportSyncCommitteeAtPeriod(rpcClient, p, nil) if err != nil { return fmt.Errorf("error exporting sync-committee at period %v: %w", p, err) } @@ -58,7 +59,48 @@ func exportSyncCommittees(rpcClient rpc.Client) error { return nil } -func exportSyncCommitteeAtPeriod(rpcClient rpc.Client, p uint64) error { +func ExportSyncCommitteeAtPeriod(rpcClient rpc.Client, p uint64, providedTx *sqlx.Tx) error { + + data, err := GetSyncCommitteAtPeriod(rpcClient, p) + if err != nil { + return err + } + + tx := providedTx + if tx == nil { + tx, err = db.WriterDb.Beginx() + if err != nil { + return err + } + defer tx.Rollback() + } + + nArgs := 3 + valueArgs := make([]interface{}, len(data)*nArgs) + valueIds := make([]string, len(data)) + for i, entry := range data { + valueArgs[i*nArgs+0] = entry.Period + valueArgs[i*nArgs+1] = entry.ValidatorIndex + valueArgs[i*nArgs+2] = entry.CommitteeIndex + valueIds[i] = fmt.Sprintf("($%d,$%d,$%d)", i*nArgs+1, i*nArgs+2, i*nArgs+3) + } + _, err = tx.Exec( + fmt.Sprintf(` + INSERT INTO sync_committees (period, validatorindex, committeeindex) + VALUES %s ON CONFLICT (period, validatorindex, committeeindex) DO NOTHING`, + strings.Join(valueIds, ",")), + valueArgs...) + if err != nil { + return err + } + + if providedTx == nil { + return tx.Commit() + } + return nil +} + +func GetSyncCommitteAtPeriod(rpcClient rpc.Client, p uint64) ([]SyncCommittee, error) { stateID := uint64(0) if p > 0 { @@ -75,66 +117,31 @@ func exportSyncCommitteeAtPeriod(rpcClient rpc.Client, p uint64) error { logger.Infof("exporting sync committee assignments for period %v (epoch %v to %v)", p, firstEpoch, lastEpoch) + // Note that the order we receive the validators from the node in is crucial + // and determines which bit reflects them in the block sync aggregate bits c, err := rpcClient.GetSyncCommittee(fmt.Sprintf("%d", stateID), epoch) if err != nil { - return err + return nil, err } - validatorsU64 := make([]uint64, len(c.Validators)) + result := make([]SyncCommittee, len(c.Validators)) for i, idxStr := range c.Validators { idxU64, err := strconv.ParseUint(idxStr, 10, 64) if err != nil { - return err + return nil, err } - validatorsU64[i] = idxU64 + result = append(result, SyncCommittee{ + Period: p, + ValidatorIndex: idxU64, + CommitteeIndex: uint64(i), + }) } - dedupMap := make(map[uint64]bool) - - for _, validator := range validatorsU64 { - dedupMap[validator] = true - } - - validatorsU64 = make([]uint64, 0, len(dedupMap)) - for validator := range dedupMap { - validatorsU64 = append(validatorsU64, validator) - } - - // start := time.Now() - // - // firstSlot := firstEpoch * utils.Config.Chain.ClConfig.SlotsPerEpoch - // lastSlot := lastEpoch*utils.Config.Chain.ClConfig.SlotsPerEpoch + utils.Config.Chain.ClConfig.SlotsPerEpoch - 1 - // logger.Infof("exporting sync committee assignments for period %v (epoch %v to %v, slot %v to %v) to bigtable", p, firstEpoch, lastEpoch, firstSlot, lastSlot) - - // err = db.BigtableClient.SaveSyncCommitteesAssignments(firstSlot, lastSlot, validatorsU64) - // if err != nil { - // return fmt.Errorf("error saving sync committee assignments: %v", err) - // } - // logger.Infof("exported sync committee assignments for period %v to bigtable in %v", p, time.Since(start)) - tx, err := db.WriterDb.Beginx() - if err != nil { - return err - } - defer tx.Rollback() - - nArgs := 3 - valueArgs := make([]interface{}, len(validatorsU64)*nArgs) - valueIds := make([]string, len(validatorsU64)) - for i, idxU64 := range validatorsU64 { - valueArgs[i*nArgs+0] = p - valueArgs[i*nArgs+1] = idxU64 - valueArgs[i*nArgs+2] = i - valueIds[i] = fmt.Sprintf("($%d,$%d,$%d)", i*nArgs+1, i*nArgs+2, i*nArgs+3) - } - _, err = tx.Exec( - fmt.Sprintf(` - INSERT INTO sync_committees (period, validatorindex, committeeindex) - VALUES %s ON CONFLICT (period, validatorindex, committeeindex) DO NOTHING`, - strings.Join(valueIds, ",")), - valueArgs...) - if err != nil { - return err - } + return result, nil +} - return tx.Commit() +type SyncCommittee struct { + Period uint64 `json:"period"` + ValidatorIndex uint64 `json:"validatorindex"` + CommitteeIndex uint64 `json:"committeeindex"` } diff --git a/handlers/api.go b/handlers/api.go index 1b51248f1b..823a6956d1 100644 --- a/handlers/api.go +++ b/handlers/api.go @@ -732,6 +732,8 @@ func ApiSyncCommittee(w http.ResponseWriter, r *http.Request) { period = utils.SyncPeriodOfEpoch(services.LatestEpoch()) + 1 } + // Beware that we do not deduplicate here since a validator can be part multiple times of the same sync committee period + // and the order of the committeeindex is important, deduplicating it would mess up the order rows, err := db.ReaderDb.Query(`SELECT period, GREATEST(period*$2, $3) AS start_epoch, ((period+1)*$2)-1 AS end_epoch, ARRAY_AGG(validatorindex ORDER BY committeeindex) AS validators FROM sync_committees WHERE period = $1 GROUP BY period`, period, utils.Config.Chain.ClConfig.EpochsPerSyncCommitteePeriod, utils.Config.Chain.ClConfig.AltairForkEpoch) if err != nil { logger.WithError(err).WithField("url", r.URL.String()).Errorf("error querying db") @@ -1020,15 +1022,24 @@ func ApiDashboard(w http.ResponseWriter, r *http.Request) { } func getSyncCommitteeInfoForValidators(validators []uint64, period uint64) ([]interface{}, error) { - rows, err := db.ReaderDb.Query( - `SELECT - period, - GREATEST(period*$3, $4) AS start_epoch, - ((period+1)*$3)-1 AS end_epoch, - ARRAY_AGG(validatorindex ORDER BY committeeindex) AS validators - FROM sync_committees - WHERE period = $1 AND validatorindex = ANY($2) - GROUP BY period`, + rows, err := db.ReaderDb.Query(` + WITH + data as ( + SELECT + period, + validatorindex, + max(committeeindex) as committeeindex + FROM sync_committees + WHERE period = $1 AND validatorindex = ANY($2) + group by period, validatorindex + ) + SELECT + period, + GREATEST(period*$3, $4) AS start_epoch, + ((period+1)*$3)-1 AS end_epoch, + ARRAY_AGG(validatorindex ORDER BY committeeindex) AS validators + FROM data + group by period;`, period, pq.Array(validators), utils.Config.Chain.ClConfig.EpochsPerSyncCommitteePeriod, utils.Config.Chain.ClConfig.AltairForkEpoch, ) @@ -1218,7 +1229,7 @@ func getSyncCommitteeSlotsStatistics(validators []uint64, epoch uint64) (types.S Validators pq.Int64Array `db:"validators"` } query, args, err := sqlx.In(` - SELECT period, COALESCE(ARRAY_AGG(validatorindex), '{}') AS validators + SELECT period, COALESCE(ARRAY_AGG(distinct validatorindex), '{}') AS validators FROM sync_committees WHERE period IN (?) AND validatorindex IN (?) GROUP BY period diff --git a/handlers/validator.go b/handlers/validator.go index fa8a46fa31..7887d09cd8 100644 --- a/handlers/validator.go +++ b/handlers/validator.go @@ -1992,7 +1992,7 @@ func ValidatorSync(w http.ResponseWriter, r *http.Request) { var syncPeriods []uint64 = []uint64{} err = db.ReaderDb.Select(&syncPeriods, ` - SELECT period + SELECT distinct period FROM sync_committees WHERE validatorindex = $1 ORDER BY period desc`, validatorIndex)