Skip to content

Commit

Permalink
use shared events table
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jan 20, 2025
1 parent e181744 commit 724875d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 35 deletions.
14 changes: 10 additions & 4 deletions components/fd/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/leptonai/gpud/components"
fd_id "github.com/leptonai/gpud/components/fd/id"
"github.com/leptonai/gpud/components/fd/metrics"
fd_state "github.com/leptonai/gpud/components/fd/state"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/components/state"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/poller"
poller_log "github.com/leptonai/gpud/pkg/poller/log"
Expand Down Expand Up @@ -114,20 +114,26 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E
return nil, nil
}

evs, err := fd_state.ReadEvents(ctx, c.cfg.PollerConfig.State.DBRO, fd_state.WithSince(since))
evs, err := state.ReadEvents(
ctx,
c.cfg.PollerConfig.State.DBRO,
state.WithEventType(EventVFSFileMaxLimitReached),
state.WithDataSource("dmesg"),
state.WithSince(since),
)
if err != nil {
return nil, err
}

events := make([]components.Event, 0)
for _, ev := range evs {
events = append(events, components.Event{
Time: metav1.Time{Time: time.Unix(ev.UnixSeconds, 0)},
Time: metav1.Time{Time: time.Unix(ev.Timestamp, 0)},
Name: ev.EventType,
Type: components.EventTypeCritical,
Message: "VFS file-max limit reached",
ExtraInfo: map[string]string{
EventKeyErrorVFSFileMaxLimitReachedUnixSeconds: strconv.FormatInt(ev.UnixSeconds, 10),
EventKeyErrorVFSFileMaxLimitReachedUnixSeconds: strconv.FormatInt(ev.Timestamp, 10),
EventKeyErrorVFSFileMaxLimitReachedLogLine: ev.EventDetails,
},
})
Expand Down
46 changes: 17 additions & 29 deletions components/fd/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

fd_dmesg "github.com/leptonai/gpud/components/fd/dmesg"
fd_id "github.com/leptonai/gpud/components/fd/id"
fd_state "github.com/leptonai/gpud/components/fd/state"
"github.com/leptonai/gpud/components/state"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/dmesg"
poller_config "github.com/leptonai/gpud/pkg/poller/config"
Expand Down Expand Up @@ -45,29 +45,6 @@ var (
func setDefaultLogPoller(ctx context.Context, cfg poller_config.Config) error {
var err error
defaultLogPollerOnce.Do(func() {
if err = fd_state.CreateTable(ctx, cfg.State.DBRW); err != nil {
return
}
go func() {
dur := fd_state.DefaultRetentionPeriod
for {
select {
case <-ctx.Done():
return
case <-time.After(dur):
now := time.Now().UTC()
before := now.Add(-dur)

purged, err := fd_state.Purge(ctx, cfg.State.DBRW, fd_state.WithBefore(before))
if err != nil {
log.Logger.Warnw("failed to delete events", "error", err)
} else {
log.Logger.Debugw("deleted events", "before", before, "purged", purged)
}
}
}
}()

var cmds *dmesg.Commands
cmds, err = dmesg.GetCommands(ctx)
if err != nil {
Expand All @@ -81,13 +58,24 @@ func setDefaultLogPoller(ctx context.Context, cfg poller_config.Config) error {
SelectFilters: defaultDmesgFiltersForFileDescriptor(),
ExtractTime: cmds.ParseTimeFunc,
ProcessMatched: func(parsedTime time.Time, line []byte, filter *poller_log_common.Filter) {
if ierr := fd_state.InsertEvent(ctx, cfg.State.DBRW, fd_state.Event{
UnixSeconds: parsedTime.Unix(),
DataSource: "dmesg",
ev := state.Event{
Timestamp: parsedTime.Unix(),
EventType: filter.Name,
DataSource: "dmesg",
EventDetails: string(line),
}); ierr != nil {
log.Logger.Errorw("failed to insert event", "error", ierr)
}

found, err := state.FindEvent(ctx, cfg.State.DBRO, ev)
if err != nil {
log.Logger.Errorw("failed to find event", "error", err)
}
if found {
log.Logger.Debugw("event already exists", "event", ev)
return
}

if err := state.InsertEvent(ctx, cfg.State.DBRW, ev); err != nil {
log.Logger.Errorw("failed to insert event", "error", err)
}
},
}
Expand Down
4 changes: 2 additions & 2 deletions components/state/events_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestSimulatedEvents(t *testing.T) {
if err != nil {
t.Fatalf("failed to read db size: %v", err)
}
t.Logf("db size: %s", humanize.Bytes(uint64(size))) // 361 M
t.Logf("db size: %s", humanize.Bytes(size)) // 361 M

if err := sqlite.Compact(ctx, dbRW); err != nil {
t.Fatalf("failed to compact db: %v", err)
Expand All @@ -71,5 +71,5 @@ func TestSimulatedEvents(t *testing.T) {
if err != nil {
t.Fatalf("failed to read db size: %v", err)
}
t.Logf("db size: %s", humanize.Bytes(uint64(size))) // 341 MB
t.Logf("db size: %s", humanize.Bytes(size)) // 341 MB
}

0 comments on commit 724875d

Please sign in to comment.