Skip to content

Commit

Permalink
feat(fd): use new/simpler log poller
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jan 19, 2025
1 parent d360b0d commit 3ebddfd
Show file tree
Hide file tree
Showing 10 changed files with 1,273 additions and 201 deletions.
88 changes: 43 additions & 45 deletions components/fd/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,59 @@ import (
"context"
"database/sql"
"fmt"
"os"
"runtime"
"strconv"
"time"

"github.com/leptonai/gpud/components"
"github.com/leptonai/gpud/components/dmesg"
fd_id "github.com/leptonai/gpud/components/fd/id"
"github.com/leptonai/gpud/components/fd/metrics"
fd_state "github.com/leptonai/gpud/components/fd/state"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/poller"
poller_log "github.com/leptonai/gpud/pkg/poller/log"
"github.com/leptonai/gpud/pkg/process"

"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()
setDefaultPoller(cfg)

func New(ctx context.Context, cfg Config) (components.Component, error) {
cctx, ccancel := context.WithCancel(ctx)
getDefaultPoller().Start(cctx, cfg.Query, fd_id.Name)

return &component{
cfg.PollerConfig.SetDefaultsIfNotSet()
setDefaultPoller(cfg)
getDefaultPoller().Start(cctx, fd_id.Name)

c := &component{
rootCtx: ctx,
cancel: ccancel,
cfg: cfg,
poller: getDefaultPoller(),
}

if runtime.GOOS == "linux" && process.CommandExists("dmesg") && os.Geteuid() == 0 {
if err := setDefaultLogPoller(ctx, cfg.PollerConfig); err != nil {
return nil, err
}
c.logPoller = getDefaultLogPoller()
c.logPoller.Start(cctx, fd_id.Name)
}

return c, nil
}

var _ components.Component = (*component)(nil)

type component struct {
rootCtx context.Context
cancel context.CancelFunc
poller query.Poller
gatherer prometheus.Gatherer
rootCtx context.Context
cancel context.CancelFunc
cfg Config
poller poller.Poller
logPoller poller_log.Poller
gatherer prometheus.Gatherer
}

func (c *component) Name() string { return fd_id.Name }
Expand Down Expand Up @@ -86,55 +105,30 @@ func (c *component) States(ctx context.Context) ([]components.State, error) {
}

const (
EventNameErrorVFSFileMaxLimitReached = "error_vfs_file_max_limit_reached"

EventKeyErrorVFSFileMaxLimitReachedUnixSeconds = "unix_seconds"
EventKeyErrorVFSFileMaxLimitReachedLogLine = "log_line"
)

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
dmesgC, err := components.GetComponent(dmesg.Name)
if err != nil {
return nil, err
if c.logPoller == nil {
return nil, nil
}

var dmesgComponent *dmesg.Component
if o, ok := dmesgC.(interface{ Unwrap() interface{} }); ok {
if unwrapped, ok := o.Unwrap().(*dmesg.Component); ok {
dmesgComponent = unwrapped
} else {
return nil, fmt.Errorf("expected *dmesg.Component, got %T", dmesgC)
}
}

// tailScan fetches the latest output from the dmesg
// it is ok to call this function multiple times for the following reasons (thus shared with events method)
// 1) dmesg "TailScan" is cheap (just tails the last x number of lines)
dmesgTailResults, err := dmesgComponent.TailScan()
evs, err := fd_state.ReadEvents(ctx, c.cfg.PollerConfig.State.DBRO, fd_state.WithSince(since))
if err != nil {
return nil, err
}

events := make([]components.Event, 0)
for _, logItem := range dmesgTailResults.TailScanMatched {
if logItem.Error != nil {
continue
}
if logItem.Matched == nil {
continue
}
if logItem.Matched.Name != dmesg.EventFileDescriptorVFSFileMaxLimitReached {
continue
}

for _, ev := range evs {
events = append(events, components.Event{
Time: logItem.Time,
Name: EventNameErrorVFSFileMaxLimitReached,
Time: metav1.Time{Time: time.Unix(ev.UnixSeconds, 0)},
Name: ev.EventType,
Type: components.EventTypeCritical,
Message: "VFS file-max limit reached",
ExtraInfo: map[string]string{
EventKeyErrorVFSFileMaxLimitReachedUnixSeconds: strconv.FormatInt(logItem.Time.Unix(), 10),
EventKeyErrorVFSFileMaxLimitReachedLogLine: logItem.Line,
EventKeyErrorVFSFileMaxLimitReachedUnixSeconds: strconv.FormatInt(ev.UnixSeconds, 10),
EventKeyErrorVFSFileMaxLimitReachedLogLine: ev.EventDetails,
},
})
}
Expand Down Expand Up @@ -190,7 +184,11 @@ func (c *component) Close() error {
log.Logger.Debugw("closing component")

// safe to call stop multiple times
c.poller.Stop(fd_id.Name)
_ = c.poller.Stop(fd_id.Name)

if c.logPoller != nil && runtime.GOOS == "linux" && process.CommandExists("dmesg") && os.Geteuid() == 0 {
_ = c.logPoller.Stop(fd_id.Name)
}

return nil
}
Expand Down
146 changes: 0 additions & 146 deletions components/fd/component_output.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
package fd

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/leptonai/gpud/components"
fd_id "github.com/leptonai/gpud/components/fd/id"
"github.com/leptonai/gpud/components/fd/metrics"
components_metrics "github.com/leptonai/gpud/components/metrics"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/pkg/file"
"github.com/leptonai/gpud/pkg/process"
)

type Output struct {
Expand Down Expand Up @@ -218,140 +209,3 @@ func (o *Output) States() ([]components.State, error) {

return []components.State{state}, nil
}

var (
defaultPollerOnce sync.Once
defaultPoller query.Poller
)

// only set once since it relies on the kube client and specific port
func setDefaultPoller(cfg Config) {
defaultPollerOnce.Do(func() {
defaultPoller = query.New(
fd_id.Name,
cfg.Query,
CreateGet(cfg),
nil,
)
})
}

func getDefaultPoller() query.Poller {
return defaultPoller
}

func CreateGet(cfg Config) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(fd_id.Name)
} else {
components_metrics.SetGetSuccess(fd_id.Name)
}
}()

now := time.Now().UTC()
nowUTC := float64(now.Unix())
metrics.SetLastUpdateUnixSeconds(nowUTC)

allocatedFileHandles, _, err := file.GetFileHandles()
if err != nil {
return nil, err
}
if err := metrics.SetAllocatedFileHandles(ctx, float64(allocatedFileHandles), now); err != nil {
return nil, err
}

runningPIDs, err := process.CountRunningPids()
if err != nil {
return nil, err
}
if err := metrics.SetRunningPIDs(ctx, float64(runningPIDs), now); err != nil {
return nil, err
}

var errs []string = nil

// may fail for mac
// e.g.,
// stat /proc: no such file or directory
usage, uerr := file.GetUsage()
if uerr != nil {
errs = append(errs, uerr.Error())
}

limit, err := file.GetLimit()
if err != nil {
return nil, err
}
if err := metrics.SetLimit(ctx, float64(limit), now); err != nil {
return nil, err
}

allocatedFileHandlesPct := calcUsagePct(allocatedFileHandles, limit)
if err := metrics.SetAllocatedFileHandlesPercent(ctx, allocatedFileHandlesPct, now); err != nil {
return nil, err
}

usageVal := runningPIDs // for mac
if usage > 0 {
usageVal = usage
}
usedPct := calcUsagePct(usageVal, limit)
if err := metrics.SetUsedPercent(ctx, usedPct, now); err != nil {
return nil, err
}

fileHandlesSupported := file.CheckFileHandlesSupported()
var thresholdAllocatedFileHandlesPct float64
if fileHandlesSupported && cfg.ThresholdAllocatedFileHandles > 0 {
thresholdAllocatedFileHandlesPct = calcUsagePct(allocatedFileHandles, cfg.ThresholdAllocatedFileHandles)
}
if err := metrics.SetThresholdAllocatedFileHandles(ctx, float64(cfg.ThresholdAllocatedFileHandles)); err != nil {
return nil, err
}
if err := metrics.SetThresholdAllocatedFileHandlesPercent(ctx, thresholdAllocatedFileHandlesPct, now); err != nil {
return nil, err
}

fdLimitSupported := file.CheckFDLimitSupported()
var thresholdRunningPIDsPct float64
if fdLimitSupported && cfg.ThresholdRunningPIDs > 0 {
thresholdRunningPIDsPct = calcUsagePct(usage, cfg.ThresholdRunningPIDs)
}
if err := metrics.SetThresholdRunningPIDs(ctx, float64(cfg.ThresholdRunningPIDs)); err != nil {
return nil, err
}
if err := metrics.SetThresholdRunningPIDsPercent(ctx, thresholdRunningPIDsPct, now); err != nil {
return nil, err
}

return &Output{
AllocatedFileHandles: allocatedFileHandles,
RunningPIDs: runningPIDs,
Usage: usage,
Limit: limit,

AllocatedFileHandlesPercent: fmt.Sprintf("%.2f", allocatedFileHandlesPct),
UsedPercent: fmt.Sprintf("%.2f", usedPct),

ThresholdAllocatedFileHandles: cfg.ThresholdAllocatedFileHandles,
ThresholdAllocatedFileHandlesPercent: fmt.Sprintf("%.2f", thresholdAllocatedFileHandlesPct),

ThresholdRunningPIDs: cfg.ThresholdRunningPIDs,
ThresholdRunningPIDsPercent: fmt.Sprintf("%.2f", thresholdRunningPIDsPct),

FileHandlesSupported: fileHandlesSupported,
FDLimitSupported: fdLimitSupported,

Errors: errs,
}, nil
}
}

func calcUsagePct(usage, limit uint64) float64 {
if limit > 0 {
return float64(usage) / float64(limit) * 100
}
return 0
}
9 changes: 6 additions & 3 deletions components/fd/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"
"time"

query_config "github.com/leptonai/gpud/components/query/config"
poller_config "github.com/leptonai/gpud/pkg/poller/config"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -16,14 +16,17 @@ func TestComponent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

component := New(
component, err := New(
ctx,
Config{
Query: query_config.Config{
PollerConfig: poller_config.Config{
Interval: metav1.Duration{Duration: 5 * time.Second},
},
},
)
if err != nil {
t.Fatalf("failed to create component: %v", err)
}

time.Sleep(time.Second)

Expand Down
11 changes: 6 additions & 5 deletions components/fd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"database/sql"
"encoding/json"

query_config "github.com/leptonai/gpud/components/query/config"
poller_config "github.com/leptonai/gpud/pkg/poller/config"
)

type Config struct {
Query query_config.Config `json:"query"`
PollerConfig poller_config.Config `json:"poller_config"`

// ThresholdAllocatedFileHandles is the number of file descriptors that are currently allocated,
// at which we consider the system to be under high file descriptor usage.
Expand All @@ -31,9 +31,10 @@ func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
if err != nil {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
cfg.PollerConfig.SetDefaultsIfNotSet()
if cfg.PollerConfig.State != nil {
cfg.PollerConfig.State.DBRW = dbRW
cfg.PollerConfig.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
Loading

0 comments on commit 3ebddfd

Please sign in to comment.