Skip to content

Commit

Permalink
tmp revert
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jan 20, 2025
1 parent 72abde9 commit e0d14b7
Show file tree
Hide file tree
Showing 29 changed files with 4,546 additions and 0 deletions.
Binary file added components/.DS_Store
Binary file not shown.
64 changes: 64 additions & 0 deletions components/query/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Package config provides the query/poller configuration.
package config

import (
"database/sql"
"time"

_ "github.com/mattn/go-sqlite3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
DefaultPollInterval = time.Minute
DefaultGetTimeout = 7 * time.Minute
DefaultQueueSize = 60
DefaultStateRetention = 30 * time.Minute
)

type Config struct {
Interval metav1.Duration `json:"interval"`

// Timeout for each get operation.
GetTimeout metav1.Duration `json:"get_timeout"`

QueueSize int `json:"queue_size"`
State *State `json:"state,omitempty"`
}

func DefaultConfig() Config {
return Config{
Interval: metav1.Duration{Duration: DefaultPollInterval},
GetTimeout: metav1.Duration{Duration: DefaultGetTimeout},
QueueSize: DefaultQueueSize,
State: &State{
Retention: metav1.Duration{Duration: DefaultStateRetention},
},
}
}

func (cfg *Config) SetDefaultsIfNotSet() {
if cfg.Interval.Duration == 0 {
cfg.Interval.Duration = DefaultPollInterval
}
if cfg.GetTimeout.Duration == 0 {
cfg.GetTimeout.Duration = DefaultGetTimeout
}
if cfg.QueueSize == 0 {
cfg.QueueSize = DefaultQueueSize
}
if cfg.State != nil && cfg.State.Retention.Duration == 0 {
cfg.State.Retention = metav1.Duration{Duration: DefaultStateRetention}
}
}

type State struct {
// DB instance for read-write.
DBRW *sql.DB `json:"-"`

// DB instance for read-only.
DBRO *sql.DB `json:"-"`

// Duration to keep states for.
Retention metav1.Duration `json:"retention"`
}
99 changes: 99 additions & 0 deletions components/query/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package config

import (
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestSetDefaultsIfNotSet(t *testing.T) {
t.Parallel()

tests := []struct {
name string
input Config
expected Config
}{
{
name: "All defaults",
input: Config{},
expected: Config{
Interval: metav1.Duration{Duration: DefaultPollInterval},
QueueSize: DefaultQueueSize,
},
},
{
name: "Custom interval",
input: Config{
Interval: metav1.Duration{Duration: 5 * time.Minute},
},
expected: Config{
Interval: metav1.Duration{Duration: 5 * time.Minute},
QueueSize: DefaultQueueSize,
},
},
{
name: "Custom queue size",
input: Config{
QueueSize: 100,
},
expected: Config{
Interval: metav1.Duration{Duration: DefaultPollInterval},
QueueSize: 100,
},
},
{
name: "State enabled without retention",
input: Config{
State: &State{},
},
expected: Config{
Interval: metav1.Duration{Duration: DefaultPollInterval},
QueueSize: DefaultQueueSize,
State: &State{
Retention: metav1.Duration{Duration: DefaultStateRetention},
},
},
},
{
name: "State enabled with custom retention",
input: Config{
State: &State{
Retention: metav1.Duration{Duration: 2 * time.Hour},
},
},
expected: Config{
Interval: metav1.Duration{Duration: DefaultPollInterval},
QueueSize: DefaultQueueSize,
State: &State{
Retention: metav1.Duration{Duration: 2 * time.Hour},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.input.SetDefaultsIfNotSet()

if tt.input.Interval.Duration != tt.expected.Interval.Duration {
t.Errorf("Interval mismatch: got %v, want %v", tt.input.Interval.Duration, tt.expected.Interval.Duration)
}

if tt.input.QueueSize != tt.expected.QueueSize {
t.Errorf("QueueSize mismatch: got %d, want %d", tt.input.QueueSize, tt.expected.QueueSize)
}

if (tt.input.State == nil) != (tt.expected.State == nil) {
t.Errorf("State mismatch: got %v, want %v", tt.input.State, tt.expected.State)
}

if tt.input.State != nil && tt.expected.State != nil {
if tt.input.State.Retention.Duration != tt.expected.State.Retention.Duration {
t.Errorf("State.Retention mismatch: got %v, want %v", tt.input.State.Retention.Duration, tt.expected.State.Retention.Duration)
}
}
})
}
}
2 changes: 2 additions & 0 deletions components/query/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package query provides the query/poller implementation.
package query
10 changes: 10 additions & 0 deletions components/query/log/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Package common provides the common log components.
package common

import (
"time"
)

type ExtractTimeFunc func([]byte) (time.Time, []byte, error)

type ProcessMatchedFunc func(parsedTime time.Time, line []byte, filter *Filter)
101 changes: 101 additions & 0 deletions components/query/log/common/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package common

import (
"bytes"
"encoding/json"
"regexp"
"strings"

"sigs.k8s.io/yaml"
)

type Filter struct {
Name string `json:"name"`

Substring *string `json:"substring,omitempty"`

Regex *string `json:"regex,omitempty"`
regex *regexp.Regexp `json:"-"`

// OwnerReferences is a list of component names that watches on this filter.
// Useful when multiple components watch on the same log file.
// e.g., if the component X and Y both watch on the same log file,
// with the same filter rule (substring/regex), this field will be
// set to [x, y].
OwnerReferences []string `json:"owner_references,omitempty"`
}

func (f *Filter) JSON() ([]byte, error) {
return json.Marshal(f)
}

func ParseFilterJSON(data []byte) (*Filter, error) {
f := new(Filter)
if err := json.Unmarshal(data, f); err != nil {
return nil, err
}
return f, nil
}

func (f *Filter) YAML() ([]byte, error) {
return yaml.Marshal(f)
}

func ParseFilterYAML(data []byte) (*Filter, error) {
f := new(Filter)
err := yaml.Unmarshal(data, f)
if err != nil {
return nil, err
}
return f, nil
}

// Compiles the regex, if set.
func (f *Filter) Compile() error {
if f.Regex != nil {
rgx, err := regexp.Compile(*f.Regex)
if err != nil {
return err
}
f.regex = rgx
}
return nil
}

func (f *Filter) MatchString(line string) (bool, error) {
if f.Regex != nil && f.regex == nil {
if err := f.Compile(); err != nil {
return false, err
}
}
return f.matchString(line), nil
}

func (f *Filter) MatchBytes(line []byte) (bool, error) {
if f.Regex != nil && f.regex == nil {
if err := f.Compile(); err != nil {
return false, err
}
}
return f.matchBytes(line), nil
}

func (f *Filter) matchString(line string) bool {
if f.Substring != nil && strings.Contains(line, *f.Substring) {
return true
}
if f.regex != nil && f.regex.MatchString(line) {
return true
}
return false
}

func (f *Filter) matchBytes(line []byte) bool {
if f.Substring != nil && bytes.Contains(line, []byte(*f.Substring)) {
return true
}
if f.regex != nil && f.regex.Match(line) {
return true
}
return false
}
96 changes: 96 additions & 0 deletions components/query/log/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Package config provides the log poller configuration.
package config

import (
"context"
"encoding/json"
"errors"

query_config "github.com/leptonai/gpud/components/query/config"
query_log_common "github.com/leptonai/gpud/components/query/log/common"

"github.com/nxadm/tail"
)

const DefaultBufferSize = 2000

type Config struct {
Query query_config.Config `json:"query"`

BufferSize int `json:"buffer_size"`

File string `json:"file"`
Commands [][]string `json:"commands"`

// For each interval, execute the scanning operation
// based on the following config (rather than polling).
// This is to backtrack the old log messages.
Scan *Scan `json:"scan,omitempty"`

// "OR" conditions to select logs.
// An event is generated if any of the filters match.
// Useful for explicit blacklisting "error" logs
// (e.g., GPU error messages in dmesg).
SelectFilters []*query_log_common.Filter `json:"select_filters"`
// "AND" conditions to select logs.
// An event is generated if all of the filters do not match.
// Useful for explicit whitelisting logs and catch all other
// (e.g., good healthy log messages).
RejectFilters []*query_log_common.Filter `json:"reject_filters"`

SeekInfo *tail.SeekInfo `json:"seek_info,omitempty"`

// Used to commit the last seek info to disk.
SeekInfoSyncer func(ctx context.Context, file string, seekInfo tail.SeekInfo) `json:"-"`

// Parse time format
TimeParseFunc query_log_common.ExtractTimeFunc `json:"-"`
}

// For each interval, execute the scanning operation
// based on the following config (rather than polling).
// This is to backtrack the old log messages.
type Scan struct {
File string `json:"file"`
Commands [][]string `json:"commands"`
LinesToTail int `json:"lines_to_tail"`
}

func (cfg *Config) Validate() error {
if cfg.File == "" && len(cfg.Commands) == 0 {
return errors.New("file or commands must be set")
}
if cfg.Scan != nil {
if cfg.Scan.File == "" && len(cfg.Scan.Commands) == 0 {
return errors.New("file or commands must be set for scan")
}
}
if len(cfg.SelectFilters) > 0 && len(cfg.RejectFilters) > 0 {
return errors.New("cannot have both select and reject filters")
}
return nil
}

func (cfg *Config) SetDefaultsIfNotSet() {
cfg.Query.SetDefaultsIfNotSet()

if cfg.BufferSize == 0 {
cfg.BufferSize = DefaultBufferSize
}
if cfg.Query.QueueSize < cfg.BufferSize {
cfg.Query.QueueSize = cfg.BufferSize
}
}

func ParseConfig(b any) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
}
cfg := new(Config)
err = json.Unmarshal(raw, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
2 changes: 2 additions & 0 deletions components/query/log/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package log provides the log file/output poller implementation.
package log
Loading

0 comments on commit e0d14b7

Please sign in to comment.