diff --git a/db/db.go b/db/db.go index 54ea96e7ca..cc2f236387 100644 --- a/db/db.go +++ b/db/db.go @@ -3148,3 +3148,79 @@ func GetSyncParticipationBySlotRange(startSlot, endSlot uint64) (map[uint64]uint return ret, nil } + +// Should be used when retrieving data for a very large amount of validators (for the notifications process) +func GetValidatorAttestationHistoryForNotifications(startEpoch uint64, endEpoch uint64) (map[types.Epoch]map[types.ValidatorIndex]bool, error) { + // first retrieve activation & exit epoch for all validators + activityData := []struct { + ValidatorIndex types.ValidatorIndex + ActivationEpoch types.Epoch + ExitEpoch types.Epoch + }{} + + err := ReaderDb.Select(&activityData, "SELECT validatorindex, activationepoch, exitepoch FROM validators ORDER BY validatorindex;") + if err != nil { + return nil, fmt.Errorf("error retrieving activation & exit epoch for validators: %w", err) + } + + logger.Info("retrieved activation & exit epochs") + + // next retrieve all attestation data from the db (need to retrieve data for the endEpoch+1 epoch as that could still contain attestations for the endEpoch) + firstSlot := startEpoch * utils.Config.Chain.ClConfig.SlotsPerEpoch + lastSlot := ((endEpoch+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch - 1) + lastQuerySlot := ((endEpoch+2)*utils.Config.Chain.ClConfig.SlotsPerEpoch - 1) + + rows, err := ReaderDb.Query(`SELECT + blocks_attestations.slot, + validators + FROM blocks_attestations + LEFT JOIN blocks ON blocks_attestations.block_root = blocks.blockroot WHERE + blocks_attestations.block_slot >= $1 AND blocks_attestations.block_slot <= $2 AND blocks.status = '1' ORDER BY block_slot`, firstSlot, lastQuerySlot) + if err != nil { + return nil, fmt.Errorf("error retrieving attestation data from the db: %w", err) + } + defer rows.Close() + + logger.Info("retrieved attestation raw data") + + // next process the data and fill up the epoch participation + // validators that participated in an epoch will have the flag set to true + // validators that missed their participation will have it set to false + epochParticipation := make(map[types.Epoch]map[types.ValidatorIndex]bool) + for rows.Next() { + var slot types.Slot + var attestingValidators pq.Int64Array + + err := rows.Scan(&slot, &attestingValidators) + if err != nil { + return nil, fmt.Errorf("error scanning attestation data: %w", err) + } + + if slot < types.Slot(firstSlot) || slot > types.Slot(lastSlot) { + continue + } + + epoch := types.Epoch(utils.EpochOfSlot(uint64(slot))) + + participation := epochParticipation[epoch] + + if participation == nil { + epochParticipation[epoch] = make(map[types.ValidatorIndex]bool) + + // logger.Infof("seeding validator duties for epoch %v", epoch) + for _, data := range activityData { + if data.ActivationEpoch <= epoch && epoch < data.ExitEpoch { + epochParticipation[epoch][types.ValidatorIndex(data.ValidatorIndex)] = false + } + } + + participation = epochParticipation[epoch] + } + + for _, validator := range attestingValidators { + participation[types.ValidatorIndex(validator)] = true + } + } + + return epochParticipation, nil +} diff --git a/services/notifications.go b/services/notifications.go index 1be3067e5b..c264cb5aed 100644 --- a/services/notifications.go +++ b/services/notifications.go @@ -1214,7 +1214,7 @@ func collectBlockProposalNotifications(notificationsByUserID map[uint64]map[type } for _, event := range events { - pubkey, err := GetGetPubkeyForIndex(event.Proposer) + pubkey, err := GetPubkeyForIndex(event.Proposer) if err != nil { utils.LogError(err, "error retrieving pubkey for validator", 0, map[string]interface{}{"validator": event.Proposer}) continue @@ -1358,8 +1358,6 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma ValidatorIndex uint64 `db:"validatorindex"` Epoch uint64 `db:"epoch"` Status uint64 `db:"status"` - Slot uint64 `db:"attesterslot"` - InclusionSlot uint64 `db:"inclusionslot"` EventFilter []byte `db:"pubkey"` } @@ -1370,50 +1368,40 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma return err } - attestations, err := db.BigtableClient.GetValidatorAttestationHistory(validators, epoch-3, epoch) + participationPerEpoch, err := db.GetValidatorAttestationHistoryForNotifications(epoch-3, epoch) if err != nil { - return fmt.Errorf("error getting validator attestations from bigtable %w", err) + return fmt.Errorf("error getting validator attestations from db %w", err) } logger.Infof("retrieved validator attestation history data") events := make([]dbResult, 0) - epochAttested := make(map[uint64]uint64) - epochTotal := make(map[uint64]uint64) - participationPerEpoch := make(map[uint64]map[uint64]int) // map[validatorindex]map[epoch]attested - for validator, history := range attestations { - for _, attestation := range history { - if participationPerEpoch[validator] == nil { - participationPerEpoch[validator] = make(map[uint64]int, 4) - } - epochTotal[attestation.Epoch] = epochTotal[attestation.Epoch] + 1 // count the total attestations for each epoch - - if attestation.Status == 0 { + epochAttested := make(map[types.Epoch]uint64) + epochTotal := make(map[types.Epoch]uint64) + for currentEpoch, participation := range participationPerEpoch { + for validatorIndex, participated := range participation { - participationPerEpoch[validator][attestation.Epoch] = 1 // missed + epochTotal[currentEpoch] = epochTotal[currentEpoch] + 1 // count the total attestations for each epoch - pubkey, err := GetGetPubkeyForIndex(validator) + if !participated { + pubkey, err := GetPubkeyForIndex(uint64(validatorIndex)) if err == nil { - if attestation.Epoch != epoch || subMap[hex.EncodeToString(pubkey)] == nil { + if currentEpoch != types.Epoch(epoch) || subMap[hex.EncodeToString(pubkey)] == nil { continue } events = append(events, dbResult{ - ValidatorIndex: validator, - Epoch: attestation.Epoch, - Status: attestation.Status, - Slot: attestation.AttesterSlot, - InclusionSlot: attestation.InclusionSlot, + ValidatorIndex: uint64(validatorIndex), + Epoch: uint64(currentEpoch), + Status: 0, EventFilter: pubkey, }) } else { - logger.Errorf("error retrieving pubkey for validator %v: %v", validator, err) + logger.Errorf("error retrieving pubkey for validator %v: %v", validatorIndex, err) } } else { - participationPerEpoch[validator][attestation.Epoch] = 2 // attested - - epochAttested[attestation.Epoch] = epochAttested[attestation.Epoch] + 1 // count the total attested attestation for each epoch (exlude missing) + epochAttested[currentEpoch] = epochAttested[currentEpoch] + 1 // count the total attested attestation for each epoch (exlude missing) } } } @@ -1443,8 +1431,6 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma Epoch: event.Epoch, Status: event.Status, EventName: types.ValidatorMissedAttestationEventName, - Slot: event.Slot, - InclusionSlot: event.InclusionSlot, EventFilter: hex.EncodeToString(event.EventFilter), } if _, exists := notificationsByUserID[*sub.UserID]; !exists { @@ -1475,11 +1461,11 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma var offlineValidators []*indexPubkeyPair var onlineValidators []*indexPubkeyPair - epochNMinus1 := epoch - 1 - epochNMinus2 := epoch - 2 - epochNMinus3 := epoch - 3 + epochNMinus1 := types.Epoch(epoch - 1) + epochNMinus2 := types.Epoch(epoch - 2) + epochNMinus3 := types.Epoch(epoch - 3) - if epochTotal[epoch] == 0 { + if epochTotal[types.Epoch(epoch)] == 0 { return fmt.Errorf("consistency error, did not retrieve attestation data for epoch %v", epoch) } if epochTotal[epochNMinus1] == 0 { @@ -1492,8 +1478,8 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma return fmt.Errorf("consistency error, did not retrieve attestation data for epoch %v", epochNMinus3) } - if epochAttested[epoch]*100/epochTotal[epoch] < 60 { - return fmt.Errorf("consistency error, did receive more than 60%% of missed attestation in epoch %v (total: %v, attested: %v)", epoch, epochTotal[epoch], epochAttested[epoch]) + if epochAttested[types.Epoch(epoch)]*100/epochTotal[types.Epoch(epoch)] < 60 { + return fmt.Errorf("consistency error, did receive more than 60%% of missed attestation in epoch %v (total: %v, attested: %v)", epoch, epochTotal[types.Epoch(epoch)], epochAttested[types.Epoch(epoch)]) } if epochAttested[epochNMinus1]*100/epochTotal[epochNMinus1] < 60 { return fmt.Errorf("consistency error, did receive more than 60%% of missed attestation in epoch %v (total: %v, attested: %v)", epochNMinus1, epochTotal[epochNMinus1], epochAttested[epochNMinus1]) @@ -1505,24 +1491,25 @@ func collectAttestationAndOfflineValidatorNotifications(notificationsByUserID ma return fmt.Errorf("consistency error, did receive more than 60%% of missed attestation in epoch %v (total: %v, attested: %v)", epochNMinus3, epochTotal[epochNMinus3], epochAttested[epochNMinus3]) } - for validator, participation := range participationPerEpoch { - if participation[epochNMinus3] == 2 && participation[epochNMinus2] == 1 && participation[epochNMinus1] == 1 && participation[epoch] == 1 { + for _, validator := range validators { + if participationPerEpoch[epochNMinus3][types.ValidatorIndex(validator)] && !participationPerEpoch[epochNMinus2][types.ValidatorIndex(validator)] && !participationPerEpoch[epochNMinus1][types.ValidatorIndex(validator)] && !participationPerEpoch[types.Epoch(epoch)][types.ValidatorIndex(validator)] { logger.Infof("validator %v detected as offline in epoch %v (did not attest since epoch %v)", validator, epoch, epochNMinus2) - pubkey, err := GetGetPubkeyForIndex(validator) + pubkey, err := GetPubkeyForIndex(validator) if err != nil { return err } offlineValidators = append(offlineValidators, &indexPubkeyPair{Index: validator, Pubkey: pubkey}) } - if participation[epochNMinus3] == 1 && participation[epochNMinus2] == 1 && participation[epochNMinus1] == 1 && participation[epoch] == 2 { + if !participationPerEpoch[epochNMinus3][types.ValidatorIndex(validator)] && !participationPerEpoch[epochNMinus2][types.ValidatorIndex(validator)] && !participationPerEpoch[epochNMinus1][types.ValidatorIndex(validator)] && participationPerEpoch[types.Epoch(epoch)][types.ValidatorIndex(validator)] { logger.Infof("validator %v detected as online in epoch %v (attested again in epoch %v)", validator, epoch, epoch) - pubkey, err := GetGetPubkeyForIndex(validator) + pubkey, err := GetPubkeyForIndex(validator) if err != nil { return err } onlineValidators = append(onlineValidators, &indexPubkeyPair{Index: validator, Pubkey: pubkey}) } + } offlineValidatorsLimit := 5000 @@ -1733,8 +1720,6 @@ type validatorAttestationNotification struct { Epoch uint64 Status uint64 // * Can be 0 = scheduled | missed, 1 executed EventName types.EventName - Slot uint64 - InclusionSlot uint64 EventFilter string UnsubscribeHash sql.NullString } @@ -1760,19 +1745,17 @@ func (n *validatorAttestationNotification) GetInfo(includeUrl bool) string { if includeUrl { switch n.Status { case 0: - generalPart = fmt.Sprintf(`Validator %[1]v missed an attestation at slot %[2]v.`, n.ValidatorIndex, n.Slot, utils.Config.Frontend.SiteDomain) - //generalPart = fmt.Sprintf(`New scheduled attestation for Validator %v at slot %v.`, n.ValidatorIndex, n.Slot) + generalPart = fmt.Sprintf(`Validator %[1]v missed an attestation in epoch %[2]v.`, n.ValidatorIndex, n.Epoch, utils.Config.Frontend.SiteDomain) case 1: - generalPart = fmt.Sprintf(`Validator %[1]v submitted a successful attestation for slot %[2]v.`, n.ValidatorIndex, n.Slot, utils.Config.Frontend.SiteDomain) + generalPart = fmt.Sprintf(`Validator %[1]v submitted a successful attestation for epoch %[2]v.`, n.ValidatorIndex, n.Epoch, utils.Config.Frontend.SiteDomain) } // return generalPart + getUrlPart(n.ValidatorIndex) } else { switch n.Status { case 0: - generalPart = fmt.Sprintf(`Validator %v missed an attestation at slot %v.`, n.ValidatorIndex, n.Slot) - //generalPart = fmt.Sprintf(`New scheduled attestation for Validator %v at slot %v.`, n.ValidatorIndex, n.Slot) + generalPart = fmt.Sprintf(`Validator %v missed an attestation in epoch %v.`, n.ValidatorIndex, n.Epoch) case 1: - generalPart = fmt.Sprintf(`Validator %v submitted a successful attestation for slot %v.`, n.ValidatorIndex, n.Slot) + generalPart = fmt.Sprintf(`Validator %v submitted a successful attestation in epoch %v.`, n.ValidatorIndex, n.Epoch) } } return generalPart @@ -1807,9 +1790,9 @@ func (n *validatorAttestationNotification) GetInfoMarkdown() string { var generalPart = "" switch n.Status { case 0: - generalPart = fmt.Sprintf(`Validator [%[1]v](https://%[3]v/validator/%[1]v) missed an attestation at slot [%[2]v](https://%[3]v/slot/%[2]v).`, n.ValidatorIndex, n.Slot, utils.Config.Frontend.SiteDomain) + generalPart = fmt.Sprintf(`Validator [%[1]v](https://%[3]v/validator/%[1]v) missed an attestation in epoch [%[2]v](https://%[3]v/epoch/%[2]v).`, n.ValidatorIndex, n.Epoch, utils.Config.Frontend.SiteDomain) case 1: - generalPart = fmt.Sprintf(`Validator [%[1]v](https://%[3]v/validator/%[1]v) submitted a successful attestation for slot [%[2]v](https://%[3]v/slot/%[2]v).`, n.ValidatorIndex, n.Slot, utils.Config.Frontend.SiteDomain) + generalPart = fmt.Sprintf(`Validator [%[1]v](https://%[3]v/validator/%[1]v) submitted a successful attestation in epoch [%[2]v](https://%[3]v/epoch/%[2]v).`, n.ValidatorIndex, n.Epoch, utils.Config.Frontend.SiteDomain) } return generalPart } diff --git a/services/pubkeyCache.go b/services/pubkeyCache.go index 22c2be3b60..d93f1e7606 100644 --- a/services/pubkeyCache.go +++ b/services/pubkeyCache.go @@ -27,7 +27,7 @@ func initPubkeyCache(path string) error { } // will retrieve the pubkey for a given validatorindex and store it for later use -func GetGetPubkeyForIndex(index uint64) ([]byte, error) { +func GetPubkeyForIndex(index uint64) ([]byte, error) { key := []byte(fmt.Sprintf("%d", index)) pubkey, err := pubkeyCacheDb.Get(key, nil)