Skip to content

Commit

Permalink
Merge branch 'master' into express-lane-timeboost
Browse files Browse the repository at this point in the history
  • Loading branch information
tsahee authored Jan 29, 2025
2 parents 1b47664 + ba2642c commit c7e23f7
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 28 deletions.
17 changes: 17 additions & 0 deletions arbnode/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package arbnode

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -62,3 +63,19 @@ func (a *BlockValidatorDebugAPI) ValidationInputsAt(ctx context.Context, msgNum
) (server_api.InputJSON, error) {
return a.val.ValidationInputsAt(ctx, arbutil.MessageIndex(msgNum), target)
}

type MaintenanceAPI struct {
runner *MaintenanceRunner
}

func (a *MaintenanceAPI) SecondsSinceLastMaintenance(ctx context.Context) (int64, error) {
running, since := a.runner.TimeSinceLastMaintenance()
if running {
return 0, errors.New("maintenance currently running")
}
return int64(since.Seconds()), nil
}

func (a *MaintenanceAPI) Trigger(ctx context.Context) error {
return a.runner.Trigger()
}
131 changes: 104 additions & 27 deletions arbnode/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package arbnode

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"

flag "github.com/spf13/pflag"
Expand All @@ -28,16 +30,17 @@ type MaintenanceRunner struct {
config MaintenanceConfigFetcher
seqCoordinator *SeqCoordinator
dbs []ethdb.Database
lastMaintenance time.Time
lastMaintenance atomic.Int64

// lock is used to ensures that at any given time, only single node is on
// maintenance mode.
lock *redislock.Simple
}

type MaintenanceConfig struct {
TimeOfDay string `koanf:"time-of-day" reload:"hot"`
Lock redislock.SimpleCfg `koanf:"lock" reload:"hot"`
TimeOfDay string `koanf:"time-of-day" reload:"hot"`
Lock redislock.SimpleCfg `koanf:"lock" reload:"hot"`
Triggerable bool `koanf:"triggerable" reload:"hot"`

// Generated: the minutes since start of UTC day to compact at
minutesAfterMidnight int
Expand Down Expand Up @@ -74,13 +77,15 @@ func (c *MaintenanceConfig) Validate() error {
}

func MaintenanceConfigAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".time-of-day", DefaultMaintenanceConfig.TimeOfDay, "UTC 24-hour time of day to run maintenance (currently only db compaction) at (e.g. 15:00)")
f.String(prefix+".time-of-day", DefaultMaintenanceConfig.TimeOfDay, "UTC 24-hour time of day to run maintenance at (e.g. 15:00)")
f.Bool(prefix+".triggerable", DefaultMaintenanceConfig.Triggerable, "maintenance is triggerable via rpc")
redislock.AddConfigOptions(prefix+".lock", f)
}

var DefaultMaintenanceConfig = MaintenanceConfig{
TimeOfDay: "",
Lock: redislock.DefaultCfg,
TimeOfDay: "",
Lock: redislock.DefaultCfg,
Triggerable: false,

minutesAfterMidnight: 0,
}
Expand All @@ -93,13 +98,14 @@ func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCo
return nil, fmt.Errorf("validating config: %w", err)
}
res := &MaintenanceRunner{
exec: exec,
config: config,
seqCoordinator: seqCoordinator,
dbs: dbs,
lastMaintenance: time.Now().UTC(),
exec: exec,
config: config,
seqCoordinator: seqCoordinator,
dbs: dbs,
}

// node restart is considered "maintenance"
res.lastMaintenance.Store(time.Now().UnixMilli())
if seqCoordinator != nil {
c := func() *redislock.SimpleCfg { return &cfg.Lock }
r := func() bool { return true } // always ready to lock
Expand All @@ -114,7 +120,7 @@ func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCo

func (mr *MaintenanceRunner) Start(ctxIn context.Context) {
mr.StopWaiter.Start(ctxIn, mr)
mr.CallIteratively(mr.maybeRunMaintenance)
mr.CallIteratively(mr.maybeRunScheduledMaintenance)
}

func wentPastTimeOfDay(before time.Time, after time.Time, timeOfDay int) bool {
Expand All @@ -136,41 +142,110 @@ func wentPastTimeOfDay(before time.Time, after time.Time, timeOfDay int) bool {
return prevMinutes < dbCompactionMinutes && newMinutes >= dbCompactionMinutes
}

func (mr *MaintenanceRunner) maybeRunMaintenance(ctx context.Context) time.Duration {
// bool if running currently, if false - time of last time it was running
func (mr *MaintenanceRunner) getPrevMaintenance() (bool, time.Time) {
milli := mr.lastMaintenance.Load()
if milli == 0 {
return true, time.Time{}
}
return false, time.UnixMilli(milli)
}

// bool if running currently, if false - duration since last time it was running
func (mr *MaintenanceRunner) TimeSinceLastMaintenance() (bool, time.Duration) {
running, maintTime := mr.getPrevMaintenance()
if running {
return true, 0
}
return false, time.Since(maintTime)
}

func (mr *MaintenanceRunner) setMaintenanceDone() {
milli := time.Now().UnixMilli()
prev := mr.lastMaintenance.Swap(milli)
if prev != 0 {
log.Error("maintenance executed in parallel", "current", time.UnixMilli(milli), "prev", time.UnixMilli(prev))
}
}

func (mr *MaintenanceRunner) setMaintenanceStart() error {
prev := mr.lastMaintenance.Swap(0)
if prev == 0 {
return errors.New("already running")
}
return nil
}

func (mr *MaintenanceRunner) maybeRunScheduledMaintenance(ctx context.Context) time.Duration {
config := mr.config()
if !config.enabled {
return time.Minute
}

now := time.Now().UTC()

if !wentPastTimeOfDay(mr.lastMaintenance, now, config.minutesAfterMidnight) {
inMaintenance, lastMaintenance := mr.getPrevMaintenance()
if inMaintenance {
return time.Minute
}

if mr.seqCoordinator == nil {
mr.lastMaintenance = now
mr.runMaintenance()
if !wentPastTimeOfDay(lastMaintenance, now, config.minutesAfterMidnight) {
return time.Minute
}

err := mr.attemptMaintenance(ctx)
if err != nil {
log.Warn("scheduled maintenance error", "err", err)
}

return time.Minute
}

func (mr *MaintenanceRunner) Trigger() error {
if !mr.config().Triggerable {
return errors.New("maintenance not configured to be triggerable")
}
if running, _ := mr.getPrevMaintenance(); running {
return nil
}
// maintenance takes a long time, run on a separate thread
mr.LaunchThread(func(ctx context.Context) {
err := mr.attemptMaintenance(ctx)
if err != nil {
log.Warn("triggered maintenance returned error", "err", err)
}
})
return nil
}

func (mr *MaintenanceRunner) attemptMaintenance(ctx context.Context) error {
if mr.seqCoordinator == nil {
return mr.runMaintenance()
}

if !mr.lock.AttemptLock(ctx) {
return time.Minute
return errors.New("did not catch maintenance lock")
}
defer mr.lock.Release(ctx)

log.Info("Attempting avoiding lockout and handing off", "targetTime", config.TimeOfDay)
res := errors.New("maintenance failed to hand-off chosen one")

log.Info("Attempting avoiding lockout and handing off", "targetTime", mr.config().TimeOfDay)
// Avoid lockout for the sequencer and try to handoff.
if mr.seqCoordinator.AvoidLockout(ctx) && mr.seqCoordinator.TryToHandoffChosenOne(ctx) {
mr.lastMaintenance = now
mr.runMaintenance()
res = mr.runMaintenance()
}
defer mr.seqCoordinator.SeekLockout(ctx) // needs called even if c.Zombify returns false

return time.Minute
return res
}

func (mr *MaintenanceRunner) runMaintenance() {
func (mr *MaintenanceRunner) runMaintenance() error {
err := mr.setMaintenanceStart()
if err != nil {
return err
}
defer mr.setMaintenanceDone()

log.Info("Compacting databases and flushing triedb to disk (this may take a while...)")
results := make(chan error, len(mr.dbs))
expected := 0
Expand All @@ -186,10 +261,12 @@ func (mr *MaintenanceRunner) runMaintenance() {
results <- mr.exec.Maintenance()
}()
for i := 0; i < expected; i++ {
err := <-results
if err != nil {
log.Warn("maintenance error", "err", err)
subErr := <-results
if subErr != nil {
err = errors.Join(err, subErr)
log.Warn("maintenance error", "err", subErr)
}
}
log.Info("Done compacting databases and flushing triedb to disk")
return err
}
11 changes: 10 additions & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,16 @@ func CreateNode(
Public: false,
})
}

if currentNode.MaintenanceRunner != nil {
apis = append(apis, rpc.API{
Namespace: "maintenance",
Version: "1.0",
Service: &MaintenanceAPI{
runner: currentNode.MaintenanceRunner,
},
Public: false,
})
}
stack.RegisterAPIs(apis)

return currentNode, nil
Expand Down

0 comments on commit c7e23f7

Please sign in to comment.