Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: refactor node metrics to tsdb #1316

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion hack/mock-gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ mockgen -source pkg/koordlet/statesinformer/states_informer.go \
mockgen -source pkg/koordlet/metriccache/tsdb_storage.go \
-destination pkg/koordlet/metriccache/mockmetriccache/mock_tsdb_storage.go \
-copyright_file ${LICENSE_HEADER_PATH}
mockgen -source pkg/koordlet/metriccache/kv_storage.go \
-destination pkg/koordlet/metriccache/mockmetriccache/mock_kv_storage.go \
-copyright_file ${LICENSE_HEADER_PATH}
mockgen -source pkg/koordlet/metriccache/metric_result.go \
-destination pkg/koordlet/metriccache/mockmetriccache/mock_metric_result.go \
-aux_files github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache=pkg/koordlet/metriccache/metric_types.go \
-copyright_file ${LICENSE_HEADER_PATH}
mockgen -source pkg/koordlet/metriccache/metric_cache.go \
-destination pkg/koordlet/metriccache/mockmetriccache/mock.go \
-aux_files github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache=pkg/koordlet/metriccache/tsdb_storage.go \
-aux_files github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache=pkg/koordlet/metriccache/tsdb_storage.go,github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache=pkg/koordlet/metriccache/kv_storage.go \
-copyright_file ${LICENSE_HEADER_PATH}
mockgen -source vendor/k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go \
-destination pkg/runtime/handler/mockclient/mock.go \
Expand Down
2 changes: 2 additions & 0 deletions pkg/koordlet/metriccache/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type NodeCPUInfo util.LocalCPUInfo

type NodeLocalStorageInfo util.LocalStorageInfo

type Devices util.Devices

type BECPUResourceMetric struct {
CPUUsed resource.Quantity // cpuUsed cores for BestEffort Cgroup
CPURealLimit resource.Quantity // suppressCPUQuantity: if suppress by cfs_quota then this value is cfs_quota/cfs_period
Expand Down
44 changes: 44 additions & 0 deletions pkg/koordlet/metriccache/kv_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metriccache

import "sync"

const (
GPUDeviceKey = "GPU"
)

type KVStorage interface {
Get(key interface{}) (interface{}, bool)
Set(key, value interface{})
}

type memoryStorage struct {
value sync.Map
}

func NewMemoryStorage() KVStorage {
return &memoryStorage{value: sync.Map{}}
}

func (ms *memoryStorage) Get(key interface{}) (interface{}, bool) {
return ms.value.Load(key)
}

func (ms *memoryStorage) Set(key, value interface{}) {
ms.value.Store(key, value)
}
85 changes: 85 additions & 0 deletions pkg/koordlet/metriccache/kv_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metriccache

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_memoryStorage_Get(t *testing.T) {
type args struct {
key interface{}
}
tests := []struct {
name string
args args
want interface{}
want1 bool
}{
{
name: "exist-test1",
args: args{key: "test1"},
want: "test1",
want1: true,
},
{
name: "not-exist-test2",
args: args{key: "test2"},
want: nil,
want1: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ms := NewMemoryStorage()
ms.Set("test1", "test1")
got, got1 := ms.Get(tt.args.key)
assert.Equalf(t, tt.want, got, "Get(%v)", tt.args.key)
assert.Equalf(t, tt.want1, got1, "Get(%v)", tt.args.key)
})
}
}

func Test_memoryStorage_Set(t *testing.T) {
type args struct {
key interface{}
value interface{}
}
tests := []struct {
name string
args args
}{
{
name: "test set",
args: args{
key: "test1",
value: "test1",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ms := NewMemoryStorage()
ms.Set(tt.args.key, tt.args.value)
value, exist := ms.Get(tt.args.key)
assert.True(t, exist)
assert.Equal(t, tt.args.value, value)
})
}
}
71 changes: 4 additions & 67 deletions pkg/koordlet/metriccache/metric_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (q *QueryParam) FillDefaultValue() {
type MetricCache interface {
Run(stopCh <-chan struct{}) error
TSDBStorage
KVStorage

GetNodeResourceMetric(param *QueryParam) NodeResourceQueryResult
GetPodResourceMetric(podUID *string, param *QueryParam) PodResourceQueryResult
GetContainerResourceMetric(containerID *string, param *QueryParam) ContainerResourceQueryResult
GetNodeCPUInfo(param *QueryParam) (*NodeCPUInfo, error)
Expand All @@ -96,6 +96,7 @@ type metricCache struct {
config *Config
db *storage
TSDBStorage
KVStorage
}

func NewMetricCache(cfg *Config) (MetricCache, error) {
Expand All @@ -107,10 +108,12 @@ func NewMetricCache(cfg *Config) (MetricCache, error) {
if err != nil {
return nil, err
}
kvdb := NewMemoryStorage()
return &metricCache{
config: cfg,
db: database,
TSDBStorage: tsdb,
KVStorage: kvdb,
}, nil
}

Expand All @@ -124,72 +127,6 @@ func (m *metricCache) Run(stopCh <-chan struct{}) error {
return nil
}

func (m *metricCache) GetNodeResourceMetric(param *QueryParam) NodeResourceQueryResult {
result := NodeResourceQueryResult{}
if param == nil || param.Start == nil || param.End == nil {
result.Error = fmt.Errorf("node query parameters are illegal %v", param)
return result
}
metrics, err := m.db.GetNodeResourceMetric(param.Start, param.End)
if err != nil {
result.Error = fmt.Errorf("get node resource metric failed, query params %v, error %v", param, err)
return result
}
if len(metrics) == 0 {
result.Metric = &NodeResourceMetric{}
return result
}

aggregateFunc := getAggregateFunc(param.Aggregate)
cpuUsed, err := aggregateFunc(metrics, AggregateParam{ValueFieldName: "CPUUsedCores", TimeFieldName: "Timestamp"})
if err != nil {
result.Error = fmt.Errorf("get node aggregate CPUUsedCores failed, metrics %v, error %v", metrics, err)
return result
}
memoryUsed, err := aggregateFunc(metrics, AggregateParam{ValueFieldName: "MemoryUsedBytes", TimeFieldName: "Timestamp"})
if err != nil {
result.Error = fmt.Errorf("get node aggregate MemoryUsedBytes failed, metrics %v, error %v", metrics, err)
return result
}

// gpu metrics time series.
// m.GPUs is a slice.
gpuUsagesByTime := make([][]gpuResourceMetric, 0)
for _, m := range metrics {
if len(m.GPUs) == 0 {
continue
}
gpuUsagesByTime = append(gpuUsagesByTime, m.GPUs)
}

var aggregateGPUMetrics []GPUMetric
if len(gpuUsagesByTime) > 0 {
aggregateGPUMetrics, err = m.aggregateGPUUsages(gpuUsagesByTime, aggregateFunc)
if err != nil {
result.Error = fmt.Errorf("get node aggregate GPUMetric failed, metrics %v, error %v", metrics, err)
return result
}
}

result.AggregateInfo, err = generateMetricAggregateInfo(metrics)
if err != nil {
result.Error = err
return result
}

result.Metric = &NodeResourceMetric{
CPUUsed: CPUMetric{
CPUUsed: *resource.NewMilliQuantity(int64(cpuUsed*1000), resource.DecimalSI),
},
MemoryUsed: MemoryMetric{
MemoryWithoutCache: *resource.NewQuantity(int64(memoryUsed), resource.BinarySI),
},
GPUs: aggregateGPUMetrics,
}

return result
}

func (m *metricCache) GetPodResourceMetric(podUID *string, param *QueryParam) PodResourceQueryResult {
result := PodResourceQueryResult{}
if podUID == nil || param == nil || param.Start == nil || param.End == nil {
Expand Down
Loading