Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: duty scheduler indices change & reorg bleeps #1725

Open
wants to merge 15 commits into
base: stage
Choose a base branch
from
15 changes: 14 additions & 1 deletion logging/fields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,17 @@ const (
FieldRole = "role"
FieldRound = "round"
FieldSlot = "slot"
FieldSlotTickerID = "slot_ticker_id"
FieldStartTimeUnixMilli = "start_time_unix_milli"
FieldSubmissionTime = "submission_time"
FieldTotalConsensusTime = "total_consensus_time"
FieldSubnets = "subnets"
FieldSyncOffset = "sync_offset"
FieldSyncResults = "sync_results"
FieldTargetNodeENR = "target_node_enr"
FieldToBlock = "to_block"
FieldTook = "took"
FieldTopic = "topic"
FieldTotalConsensusTime = "total_consensus_time"
FieldTxHash = "tx_hash"
FieldType = "type"
FieldUpdatedENRLocalNode = "updated_enr"
Expand Down Expand Up @@ -408,3 +409,15 @@ func Type(v any) zapcore.Field {
func FormatDuration(val time.Duration) string {
return strconv.FormatFloat(val.Seconds(), 'f', 5, 64)
}

func FormatSlotTickerID(epoch phase0.Epoch, slot phase0.Slot) string {
return fmt.Sprintf("e%v-s%v-#%v", epoch, slot, slot%32+1)
}

func FormatSlotTickerCommitteeID(period uint64, epoch phase0.Epoch, slot phase0.Slot) string {
return fmt.Sprintf("p%v-%s", period, FormatSlotTickerID(epoch, slot))
}

func SlotTickerID(val string) zap.Field {
return zap.String(FieldSlotTickerID, val)
}
163 changes: 72 additions & 91 deletions operator/duties/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,7 @@ func (h *AttesterHandler) Name() string {
return spectypes.BNRoleAttester.String()
}

// HandleDuties manages the duty lifecycle, handling different cases:
//
// On First Run:
// 1. Fetch duties for the current epoch.
// 2. If necessary, fetch duties for the next epoch.
// 3. Execute duties.
//
// On Re-org:
//
// If the previous dependent root changed:
// 1. Fetch duties for the current epoch.
// 2. Execute duties.
// If the current dependent root changed:
// 1. Execute duties.
// 2. If necessary, fetch duties for the next epoch.
//
// On Indices Change:
// 1. Execute duties.
// 2. ResetEpoch duties for the current epoch.
// 3. Fetch duties for the current epoch.
// 4. If necessary, fetch duties for the next epoch.
//
// On Ticker event:
// 1. Execute duties.
// 2. If necessary, fetch duties for the next epoch.
// HandleDuties manages the duty lifecycle
func (h *AttesterHandler) HandleDuties(ctx context.Context) {
h.logger.Info("starting duty handler")
defer h.logger.Info("duty handler exited")
Expand All @@ -75,67 +51,38 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) {
case <-next:
slot := h.ticker.Slot()
next = h.ticker.Next()
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1)
h.logger.Debug("🛠 ticker event", zap.String("epoch_slot_pos", buildStr))

h.processExecution(currentEpoch, slot)
if h.indicesChanged {
h.duties.ResetEpoch(currentEpoch)
h.indicesChanged = false
}
h.processFetching(ctx, currentEpoch, slot)

slotsPerEpoch := h.network.Beacon.SlotsPerEpoch()

// If we have reached the mid-point of the epoch, fetch the duties for the next epoch in the next slot.
// This allows us to set them up at a time when the beacon node should be less busy.
if uint64(slot)%slotsPerEpoch == slotsPerEpoch/2-1 {
h.fetchNextEpoch = true
}

// last slot of epoch
if uint64(slot)%slotsPerEpoch == slotsPerEpoch-1 {
h.duties.ResetEpoch(currentEpoch - 1)
epoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
tickerID := fields.FormatSlotTickerID(epoch, slot)
h.logger.Debug("🛠 ticker event", fields.SlotTickerID(tickerID))

if !h.network.PastAlanForkAtEpoch(epoch) {
h.processExecution(epoch, slot)
if h.indicesChanged {
h.duties.Reset(epoch)
h.indicesChanged = false
}
h.processFetching(ctx, epoch, slot)
h.processSlotTransition(epoch, slot)
}

case reorgEvent := <-h.reorg:
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(reorgEvent.Slot)
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, reorgEvent.Slot, reorgEvent.Slot%32+1)
h.logger.Info("🔀 reorg event received", zap.String("epoch_slot_pos", buildStr), zap.Any("event", reorgEvent))

// reset current epoch duties
if reorgEvent.Previous {
h.duties.ResetEpoch(currentEpoch)
h.fetchCurrentEpoch = true
if h.shouldFetchNexEpoch(reorgEvent.Slot) {
h.duties.ResetEpoch(currentEpoch + 1)
h.fetchNextEpoch = true
}
epoch := h.network.Beacon.EstimatedEpochAtSlot(reorgEvent.Slot)
tickerID := fields.FormatSlotTickerID(epoch, reorgEvent.Slot)
h.logger.Info("🔀 reorg event received", fields.SlotTickerID(tickerID), zap.Any("event", reorgEvent))

h.processFetching(ctx, currentEpoch, reorgEvent.Slot)
} else if reorgEvent.Current {
// reset & re-fetch next epoch duties if in appropriate slot range,
// otherwise they will be fetched by the appropriate slot tick.
if h.shouldFetchNexEpoch(reorgEvent.Slot) {
h.duties.ResetEpoch(currentEpoch + 1)
h.fetchNextEpoch = true
}
if !h.network.PastAlanForkAtEpoch(epoch) {
h.processReorg(ctx, epoch, reorgEvent)
}

case <-h.indicesChange:
slot := h.network.Beacon.EstimatedCurrentSlot()
currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1)
h.logger.Info("🔁 indices change received", zap.String("epoch_slot_pos", buildStr))

h.indicesChanged = true
h.fetchCurrentEpoch = true
epoch := h.network.Beacon.EstimatedEpochAtSlot(slot)
tickerID := fields.FormatSlotTickerID(epoch, slot)
h.logger.Info("🔁 indices change received", fields.SlotTickerID(tickerID))

// reset next epoch duties if in appropriate slot range
if h.shouldFetchNexEpoch(slot) {
h.duties.ResetEpoch(currentEpoch + 1)
h.fetchNextEpoch = true
if !h.network.PastAlanForkAtEpoch(epoch) {
h.indicesChanged = true
h.processIndicesChange(epoch, slot)
}
}
}
Expand Down Expand Up @@ -177,27 +124,61 @@ func (h *AttesterHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot)
return
}

if !h.network.PastAlanForkAtEpoch(h.network.Beacon.EstimatedEpochAtSlot(slot)) {
toExecute := make([]*genesisspectypes.Duty, 0, len(duties)*2)
for _, d := range duties {
if h.shouldExecute(d) {
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAttester))
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAggregator))
}
toExecute := make([]*genesisspectypes.Duty, 0, len(duties)*2)
for _, d := range duties {
if h.shouldExecute(d) {
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAttester))
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAggregator))
}
}

h.dutiesExecutor.ExecuteGenesisDuties(h.logger, toExecute)
return
h.dutiesExecutor.ExecuteGenesisDuties(h.logger, toExecute)
}

func (h *AttesterHandler) processIndicesChange(epoch phase0.Epoch, slot phase0.Slot) {
h.fetchCurrentEpoch = true

// reset next epoch duties if in appropriate slot range
if h.shouldFetchNexEpoch(slot) {
h.duties.Reset(epoch + 1)
h.fetchNextEpoch = true
}
}

toExecute := make([]*spectypes.ValidatorDuty, 0, len(duties))
for _, d := range duties {
if h.shouldExecute(d) {
toExecute = append(toExecute, h.toSpecDuty(d, spectypes.BNRoleAggregator))
func (h *AttesterHandler) processReorg(ctx context.Context, epoch phase0.Epoch, reorgEvent ReorgEvent) {
// reset current epoch duties
if reorgEvent.Previous {
h.duties.Reset(epoch)
h.fetchCurrentEpoch = true
if h.shouldFetchNexEpoch(reorgEvent.Slot) {
h.duties.Reset(epoch + 1)
h.fetchNextEpoch = true
}

h.processFetching(ctx, epoch, reorgEvent.Slot)
} else if reorgEvent.Current {
// reset & re-fetch next epoch duties if in appropriate slot range,
// otherwise they will be fetched by the appropriate slot tick.
if h.shouldFetchNexEpoch(reorgEvent.Slot) {
h.duties.Reset(epoch + 1)
h.fetchNextEpoch = true
}
}
}

func (h *AttesterHandler) processSlotTransition(epoch phase0.Epoch, slot phase0.Slot) {
slotsPerEpoch := h.network.Beacon.SlotsPerEpoch()

h.dutiesExecutor.ExecuteDuties(h.logger, toExecute)
// If we have reached the mid-point of the epoch, fetch the duties for the next epoch in the next slot.
// This allows us to set them up at a time when the beacon node should be less busy.
if uint64(slot)%slotsPerEpoch == slotsPerEpoch/2-1 {
h.fetchNextEpoch = true
}
Comment on lines +176 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use if h.shouldFetchNexEpoch(... here ? I guess applies to other Duties changed in this PR too.


// last slot of epoch
if uint64(slot)%slotsPerEpoch == slotsPerEpoch-1 {
h.duties.Reset(epoch - 1)
}
Comment on lines +182 to +185
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have helper func similar to shouldFetchNexEpoch for uint64(slot)%slotsPerEpoch == slotsPerEpoch-1.

}

func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, epoch phase0.Epoch) error {
Expand Down
Loading
Loading