From 3b8ea47341cf2d4633ca90a86fe13c40e3635a61 Mon Sep 17 00:00:00 2001 From: ozline Date: Sun, 27 Oct 2024 00:36:44 +0800 Subject: [PATCH] feat: refactor resource-recommend controller --- .../app/controller/resourcerecommender.go | 19 +- .../app/options/resourcerecommender.go | 12 + go.mod | 7 +- go.sum | 11 +- pkg/client/control/resourcerecommend.go | 114 +++++ pkg/client/control/resourcerecommend_test.go | 206 +++++++++ pkg/config/controller/resourcerecommender.go | 6 + .../controller/controller.go | 193 --------- .../controller/controller_test.go | 89 ---- .../controller/oom_recorder_controller.go | 133 ++++-- .../oom_recorder_controller_test.go | 84 ++-- .../resourcerecommend_controller.go | 404 ++++++++++++++---- .../resourcerecommend_controller_test.go | 238 ++--------- .../resource-recommend/oom/oom_recorder.go | 36 +- .../oom/oom_recorder_test.go | 30 +- .../processor/manager/processor_manager.go | 7 +- .../processor/percentile/process_gc.go | 10 +- .../processor/percentile/process_gc_test.go | 13 +- .../processor/percentile/processor.go | 8 +- .../resource/k8s_resource.go | 70 ++- .../resource/k8s_resource_test.go | 243 +---------- .../resource/k8s_resource_test_util.go | 40 +- .../types/conditions/conditions_test.go | 15 +- .../types/recommendation/recommendation.go | 10 +- .../recommendation/recommendation_test.go | 48 +-- .../types/recommendation/validate.go | 9 +- .../types/recommendation/validate_test.go | 66 +-- 27 files changed, 1051 insertions(+), 1070 deletions(-) create mode 100644 pkg/client/control/resourcerecommend.go create mode 100644 pkg/client/control/resourcerecommend_test.go delete mode 100644 pkg/controller/resource-recommend/controller/controller.go delete mode 100644 pkg/controller/resource-recommend/controller/controller_test.go diff --git a/cmd/katalyst-controller/app/controller/resourcerecommender.go b/cmd/katalyst-controller/app/controller/resourcerecommender.go index cc6855736..1e83c8a27 100644 --- a/cmd/katalyst-controller/app/controller/resourcerecommender.go +++ b/cmd/katalyst-controller/app/controller/resourcerecommender.go @@ -32,19 +32,30 @@ const ( func StartResourceRecommenderController( ctx context.Context, - _ *katalyst.GenericContext, + controlCtx *katalyst.GenericContext, conf *config.Configuration, _ interface{}, _ string, ) (bool, error) { - resourceRecommenderController, err := controller.NewResourceRecommenderController(ctx, + oomRecorderController, err := controller.NewPodOOMRecorderController(ctx, controlCtx, conf.GenericConfiguration, + conf.GenericControllerConfiguration, conf.ControllersConfiguration.ResourceRecommenderConfig) if err != nil { - klog.Errorf("failed to new ResourceRecommender controller") + klog.Errorf("failed to new PodOOMRecorder controller") return false, err } + recController, err := controller.NewResourceRecommendController(ctx, controlCtx, + conf.GenericConfiguration, + conf.GenericControllerConfiguration, + conf.ControllersConfiguration.ResourceRecommenderConfig, + oomRecorderController.Recorder) + if err != nil { + klog.Errorf("failed to new ResourceRecommend Controller") + return false, err + } + go oomRecorderController.Run() + go recController.Run() - go resourceRecommenderController.Run() return true, nil } diff --git a/cmd/katalyst-controller/app/options/resourcerecommender.go b/cmd/katalyst-controller/app/options/resourcerecommender.go index 8836334e5..42c98801c 100644 --- a/cmd/katalyst-controller/app/options/resourcerecommender.go +++ b/cmd/katalyst-controller/app/options/resourcerecommender.go @@ -25,6 +25,11 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/datasource/prometheus" ) +const ( + defaultRecSyncWorkers = 1 + defaultResourceRecommendReSyncPeriod = 24 * time.Hour +) + type ResourceRecommenderOptions struct { OOMRecordMaxNumber int `desc:"max number for oom record"` @@ -39,6 +44,9 @@ type ResourceRecommenderOptions struct { // LogVerbosityLevel to specify log verbosity level. (The default level is 4) // Set it to something larger than 4 if more detailed logs are needed. LogVerbosityLevel string + + RecSyncWorkers int + RecSyncPeriod time.Duration } // NewResourceRecommenderOptions creates a new Options with a default config. @@ -79,6 +87,8 @@ func (o *ResourceRecommenderOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.StringVar(&o.DataSourcePromConfig.BaseFilter, "resourcerecommend-prometheus-promql-base-filter", "", ""+ "Get basic filters in promql for historical usage data. This filter is added to all promql statements. "+ "Supports filters format of promql, e.g: group=\\\"Katalyst\\\",cluster=\\\"cfeaf782fasdfe\\\"") + fs.IntVar(&o.RecSyncWorkers, "res-sync-workers", defaultRecSyncWorkers, "num of goroutine to sync recs") + fs.DurationVar(&o.RecSyncPeriod, "resource-recommend-resync-period", defaultResourceRecommendReSyncPeriod, "period for recommend controller to sync resource recommend") } func (o *ResourceRecommenderOptions) ApplyTo(c *controller.ResourceRecommenderConfig) error { @@ -88,6 +98,8 @@ func (o *ResourceRecommenderOptions) ApplyTo(c *controller.ResourceRecommenderCo c.DataSource = o.DataSource c.DataSourcePromConfig = o.DataSourcePromConfig c.LogVerbosityLevel = o.LogVerbosityLevel + c.RecSyncWorkers = o.RecSyncWorkers + c.RecSyncPeriod = o.RecSyncPeriod return nil } diff --git a/go.mod b/go.mod index a93a30b46..cbd799140 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( bou.ke/monkey v1.0.2 github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d + github.com/bytedance/mockey v1.2.11 github.com/cespare/xxhash v1.1.0 github.com/cilium/ebpf v0.7.0 github.com/containerd/cgroups v1.0.1 @@ -29,6 +30,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/samber/lo v1.39.0 github.com/slok/kubewebhook v0.11.0 + github.com/smartystreets/goconvey v1.6.4 github.com/spf13/cobra v1.6.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.1 @@ -93,6 +95,7 @@ require ( github.com/google/gnostic v0.6.9 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect @@ -101,6 +104,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible // indirect @@ -117,6 +121,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect + github.com/smartystreets/assertions v1.1.0 // indirect github.com/stretchr/objx v0.5.0 // indirect go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect @@ -129,6 +134,7 @@ require ( go.opentelemetry.io/proto/otlp v0.7.0 // indirect go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.19.1 // indirect + golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/net v0.17.0 // indirect @@ -136,7 +142,6 @@ require ( golang.org/x/sync v0.1.0 // indirect golang.org/x/term v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect - gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect gomodules.xyz/jsonpatch/v3 v3.0.1 // indirect gomodules.xyz/orderedmap v0.1.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 10f905bca..def3c656b 100644 --- a/go.sum +++ b/go.sum @@ -127,6 +127,8 @@ github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2y github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bombsimon/wsl/v3 v3.1.0/go.mod h1:st10JtZYLE4D5sC7b8xV4zTKZwAQjCH/Hy2Pm1FNZIc= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/bytedance/mockey v1.2.11 h1:xIR17ILTtyeIh0iNTCbtslRnB7/N2o16wQvmtlbMivA= +github.com/bytedance/mockey v1.2.11/go.mod h1:bNrUnI1u7+pAc0TYDgPATM+wF2yzHxmNH+iDXg4AOCU= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -250,7 +252,6 @@ github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go. github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= github.com/euank/go-kmsg-parser v2.0.0+incompatible/go.mod h1:MhmAMZ8V4CYH4ybgdRwPr2TU5ThnS43puaKEMpja1uw= -github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -454,6 +455,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gookit/color v1.2.5/go.mod h1:AhIE+pS6D4Ql0SQWbBeXPHw7gY0/sjHoA4s/n1KB7xg= github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 h1:l5lAOZEym3oK3SQ2HBHWsJUfbNBiTXJDeW2QDxw9AQ0= github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= @@ -518,7 +520,6 @@ github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7P github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5/go.mod h1:DM4VvS+hD/kDi1U1QsX2fnZowwBhqD0Dk3bRPKF/Oc8= -github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jingyugao/rowserrcheck v0.0.0-20191204022205-72ab7603b68a/go.mod h1:xRskid8CManxVta/ALEhJha/pweKBaVG6fWgc0yH25s= github.com/jirfag/go-printf-func-name v0.0.0-20191110105641-45db9963cdd3/go.mod h1:HEWGJkRDzjJY2sqdDwxccsGicWEf9BQOZsq2tV+xzM0= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -542,6 +543,7 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -823,7 +825,9 @@ github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs github.com/slok/kubewebhook v0.11.0 h1:mRUOHXpMNxROTcqGVq06BQX2r13cT1Kjw0ylcJhTg0g= github.com/slok/kubewebhook v0.11.0/go.mod h1:HWkaQH3ZbQpLeP3ylW/NPhOaYByxCIRU36HPmUEoqyo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/assertions v1.1.0 h1:MkTeG1DMwsrdH7QtLXy5W+fUxWq+vmb6cLmyJ7aRtF0= github.com/smartystreets/assertions v1.1.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= @@ -1003,6 +1007,8 @@ go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= +golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff h1:XmKBi9R6duxOB3lfc72wyrwiOY7X2Jl1wuI+RFOyMDE= +golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -1367,7 +1373,6 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY= -gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY= gomodules.xyz/jsonpatch/v3 v3.0.1 h1:Te7hKxV52TKCbNYq3t84tzKav3xhThdvSsSp/W89IyI= gomodules.xyz/jsonpatch/v3 v3.0.1/go.mod h1:CBhndykehEwTOlEfnsfJwvkFQbSN8YZFr9M+cIHAJto= gomodules.xyz/orderedmap v0.1.0 h1:fM/+TGh/O1KkqGR5xjTKg6bU8OKBkg7p0Y+x/J9m8Os= diff --git a/pkg/client/control/resourcerecommend.go b/pkg/client/control/resourcerecommend.go new file mode 100644 index 000000000..fae879a37 --- /dev/null +++ b/pkg/client/control/resourcerecommend.go @@ -0,0 +1,114 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package control + +import ( + "context" + "encoding/json" + "fmt" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/jsonmergepatch" + + apis "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" + clientset "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned" +) + +// ResourceRecommendUpdater is used to update ResourceRecommend CR +type ResourceRecommendUpdater interface { + UpdateResourceRecommend(ctx context.Context, rec *apis.ResourceRecommend, + opts v1.UpdateOptions) (*apis.ResourceRecommend, error) + + PatchResourceRecommend(ctx context.Context, oldRec *apis.ResourceRecommend, + newRec *apis.ResourceRecommend) error + + CreateResourceRecommend(ctx context.Context, rec *apis.ResourceRecommend, + opts v1.CreateOptions) (*apis.ResourceRecommend, error) +} +type DummyResourceRecommendUpdater struct{} + +func (d *DummyResourceRecommendUpdater) UpdateResourceRecommend(_ context.Context, _ *apis.ResourceRecommend, + _ v1.UpdateOptions, +) (*apis.ResourceRecommend, error) { + return nil, nil +} + +func (d *DummyResourceRecommendUpdater) PatchResourceRecommend(_ context.Context, _ *apis.ResourceRecommend, + _ *apis.ResourceRecommend, +) error { + return nil +} + +func (d *DummyResourceRecommendUpdater) CreateResourceRecommend(_ context.Context, _ *apis.ResourceRecommend, + _ v1.CreateOptions, +) (*apis.ResourceRecommend, error) { + return nil, nil +} + +type RealResourceRecommendUpdater struct { + client clientset.Interface +} + +func NewRealResourceRecommendUpdater(client clientset.Interface) *RealResourceRecommendUpdater { + return &RealResourceRecommendUpdater{ + client: client, + } +} + +func (r *RealResourceRecommendUpdater) UpdateResourceRecommend(ctx context.Context, rec *apis.ResourceRecommend, + opts v1.UpdateOptions, +) (*apis.ResourceRecommend, error) { + if rec == nil { + return nil, fmt.Errorf("can't update a nil ResourceRecommend") + } + + return r.client.RecommendationV1alpha1().ResourceRecommends(rec.Namespace).Update(ctx, rec, opts) +} + +func (r *RealResourceRecommendUpdater) PatchResourceRecommend(ctx context.Context, oldRec *apis.ResourceRecommend, + newRec *apis.ResourceRecommend, +) error { + if oldRec == nil || newRec == nil { + return fmt.Errorf("can't patch a nil ResourceRecommend") + } + + oldData, err := json.Marshal(oldRec) + if err != nil { + return err + } + newData, err := json.Marshal(newRec) + if err != nil { + return err + } + + patchBytes, err := jsonmergepatch.CreateThreeWayJSONMergePatch(oldData, newData, oldData) + if err != nil { + return fmt.Errorf("failed to create merge patch for ResourceRecommend %q/%q: %v", oldRec.Namespace, oldRec.Name, err) + } + + _, err = r.client.RecommendationV1alpha1().ResourceRecommends(oldRec.Namespace).Patch(ctx, oldRec.Name, types.MergePatchType, patchBytes, v1.PatchOptions{}, "status") + return err +} + +func (r *RealResourceRecommendUpdater) CreateResourceRecommend(ctx context.Context, rec *apis.ResourceRecommend, opts v1.CreateOptions) (*apis.ResourceRecommend, error) { + if rec == nil { + return nil, fmt.Errorf("can't update a nil ResourceRecommend") + } + + return r.client.RecommendationV1alpha1().ResourceRecommends(rec.Namespace).Create(ctx, rec, opts) +} diff --git a/pkg/client/control/resourcerecommend_test.go b/pkg/client/control/resourcerecommend_test.go new file mode 100644 index 000000000..8ee89c10c --- /dev/null +++ b/pkg/client/control/resourcerecommend_test.go @@ -0,0 +1,206 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package control + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apis "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" + externalfake "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned/fake" +) + +func TestPatchResourceRecommend(t *testing.T) { + t.Parallel() + + oldRec := &apis.ResourceRecommend{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rec1", + Namespace: "default", + }, + } + + newRec := &apis.ResourceRecommend{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rec1", + Namespace: "default", + Annotations: map[string]string{ + "recommendation.katalyst.io/resource-recommend": "true", + }, + }, + } + + for _, tc := range []struct { + name string + oldRec *apis.ResourceRecommend + newRec *apis.ResourceRecommend + gotErr bool + }{ + { + name: "add annotation", + oldRec: oldRec, + newRec: newRec, + gotErr: false, + }, + { + name: "remove annotation", + oldRec: oldRec, + newRec: newRec, + gotErr: false, + }, + { + name: "missing new rec", + oldRec: oldRec, + newRec: nil, + gotErr: true, + }, + { + name: "same rec", + oldRec: oldRec, + newRec: oldRec, + gotErr: false, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + internalClient := externalfake.NewSimpleClientset(tc.oldRec) + updater := NewRealResourceRecommendUpdater(internalClient) + err := updater.PatchResourceRecommend(context.TODO(), tc.oldRec, tc.newRec) + assert.Equal(t, tc.gotErr, err != nil) + rec, err := internalClient.RecommendationV1alpha1(). + ResourceRecommends("default").Get(context.TODO(), tc.oldRec.Name, metav1.GetOptions{}) + assert.NoError(t, err) + if !tc.gotErr { + assert.Equal(t, tc.newRec, rec) + } + }) + } +} + +func TestCreateResourceRecommend(t *testing.T) { + t.Parallel() + + rec := &apis.ResourceRecommend{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rec1", + Namespace: "default", + }, + } + + for _, tc := range []struct { + name string + rec *apis.ResourceRecommend + gotErr bool + }{ + { + name: "create rec", + rec: rec, + gotErr: false, + }, + { + name: "missing rec", + rec: nil, + gotErr: true, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + internalClient := externalfake.NewSimpleClientset() + updater := NewRealResourceRecommendUpdater(internalClient) + rec, err := updater.CreateResourceRecommend(context.TODO(), tc.rec, metav1.CreateOptions{}) + assert.Equal(t, tc.gotErr, err != nil) + assert.Equal(t, tc.rec, rec) + }) + } +} + +func TestUpdateResourceRecommend(t *testing.T) { + t.Parallel() + + oldRec := &apis.ResourceRecommend{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rec1", + Namespace: "default", + }, + } + + newRec := &apis.ResourceRecommend{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rec1", + Namespace: "default", + Annotations: map[string]string{ + "recommendation.katalyst.io/resource-recommend": "true", + }, + }, + } + + for _, tc := range []struct { + name string + oldRec *apis.ResourceRecommend + newRec *apis.ResourceRecommend + gotErr bool + }{ + { + name: "add annotation", + oldRec: oldRec, + newRec: newRec, + gotErr: false, + }, + { + name: "remove annotation", + oldRec: oldRec, + newRec: newRec, + gotErr: false, + }, + { + name: "new rec is nil", + oldRec: oldRec, + newRec: nil, + gotErr: true, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + internalClient := externalfake.NewSimpleClientset(tc.oldRec) + updater := NewRealResourceRecommendUpdater(internalClient) + rec, err := updater.UpdateResourceRecommend(context.TODO(), tc.newRec, metav1.UpdateOptions{}) + assert.Equal(t, tc.gotErr, err != nil) + assert.Equal(t, tc.newRec, rec) + }) + } +} + +func TestDummyUpdater(t *testing.T) { + t.Parallel() + updater := DummyResourceRecommendUpdater{} + + assert.NoError(t, updater.PatchResourceRecommend(nil, nil, nil)) + + _, err := updater.UpdateResourceRecommend(nil, nil, metav1.UpdateOptions{}) + assert.NoError(t, err) + + _, err = updater.CreateResourceRecommend(nil, nil, metav1.CreateOptions{}) + assert.NoError(t, err) +} diff --git a/pkg/config/controller/resourcerecommender.go b/pkg/config/controller/resourcerecommender.go index b9b903884..edb8f2aed 100644 --- a/pkg/config/controller/resourcerecommender.go +++ b/pkg/config/controller/resourcerecommender.go @@ -17,6 +17,8 @@ limitations under the License. package controller import ( + "time" + "github.com/kubewharf/katalyst-core/pkg/util/datasource/prometheus" ) @@ -34,6 +36,10 @@ type ResourceRecommenderConfig struct { // LogVerbosityLevel to specify log verbosity level. (The default level is 4) // Set it to something larger than 4 if more detailed logs are needed. LogVerbosityLevel string + + // number of workers to sync + RecSyncWorkers int + RecSyncPeriod time.Duration } func NewResourceRecommenderConfig() *ResourceRecommenderConfig { diff --git a/pkg/controller/resource-recommend/controller/controller.go b/pkg/controller/resource-recommend/controller/controller.go deleted file mode 100644 index 0c65fb119..000000000 --- a/pkg/controller/resource-recommend/controller/controller.go +++ /dev/null @@ -1,193 +0,0 @@ -/* -Copyright 2022 The Katalyst Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/healthz" - - "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" - "github.com/kubewharf/katalyst-core/pkg/config/controller" - "github.com/kubewharf/katalyst-core/pkg/config/generic" - "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource" - "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource/prometheus" - "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/oom" - processormanager "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor/manager" - recommendermanager "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/recommender/manager" -) - -var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") - - preCacheObjects = []client.Object{ - &v1alpha1.ResourceRecommend{}, - } -) - -func init() { - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - - utilruntime.Must(v1alpha1.AddToScheme(scheme)) - //+kubebuilder:scaffold:scheme -} - -type ResourceRecommender struct { - ctx context.Context - genericConf *generic.GenericConfiguration - opts *controller.ResourceRecommenderConfig -} - -func NewResourceRecommenderController(ctx context.Context, genericConf *generic.GenericConfiguration, opts *controller.ResourceRecommenderConfig) (*ResourceRecommender, error) { - return &ResourceRecommender{ - ctx: ctx, - genericConf: genericConf, - opts: opts, - }, nil -} - -func (r ResourceRecommender) Run() { - config := ctrl.GetConfigOrDie() - config.QPS = r.genericConf.ClientConnection.QPS - config.Burst = int(r.genericConf.ClientConnection.Burst) - - ctrlOptions := ctrl.Options{ - Scheme: scheme, - MetricsBindAddress: ":" + r.opts.MetricsBindPort, - HealthProbeBindAddress: ":" + r.opts.HealthProbeBindPort, - Port: 9443, - } - - mgr, err := ctrl.NewManager(config, ctrlOptions) - if err != nil { - klog.Fatal(fmt.Sprintf("unable to start manager: %s", err)) - os.Exit(1) - } - - if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - klog.Fatal(fmt.Sprintf("unable to set up health check: %s", err)) - os.Exit(1) - } - if err = mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - klog.Fatal(fmt.Sprintf("unable to set up ready check: %s", err)) - os.Exit(1) - } - - dataProxy := initDataSources(r.opts) - klog.Infof("successfully init data proxy %v", *dataProxy) - - // Processor Manager - processorManager := processormanager.NewManager(dataProxy, mgr.GetClient()) - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("start processor panic: %v", r.(error)) - klog.Error(err) - panic(err) - } - }() - processorManager.StartProcess(r.ctx) - }() - - // OOM Recorder - podOOMRecorder := &PodOOMRecorderController{ - PodOOMRecorder: &oom.PodOOMRecorder{ - Client: mgr.GetClient(), - OOMRecordMaxNumber: r.opts.OOMRecordMaxNumber, - }, - } - if err = podOOMRecorder.SetupWithManager(mgr); err != nil { - klog.Fatal(fmt.Sprintf("Unable to create controller: %s", err)) - os.Exit(1) - } - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("Run oom recorder panic: %v", r.(error)) - klog.Error(err) - panic(err) - } - }() - for count := 0; count < 100; count++ { - cacheReady := mgr.GetCache().WaitForCacheSync(r.ctx) - if cacheReady { - break - } - time.Sleep(100 * time.Millisecond) - } - if err = podOOMRecorder.Run(r.ctx.Done()); err != nil { - klog.Warningf("Run oom recorder failed: %v", err) - } - }() - - recommenderManager := recommendermanager.NewManager(*processorManager, podOOMRecorder) - - for _, obj := range preCacheObjects { - _, _ = mgr.GetCache().GetInformer(context.TODO(), obj) - } - - // Resource Recommend Controller - resourceRecommendController := &ResourceRecommendController{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ProcessorManager: processorManager, - RecommenderManager: recommenderManager, - } - - if err = resourceRecommendController.SetupWithManager(mgr); err != nil { - klog.Fatal(fmt.Sprintf("ResourceRecommend Controller unable to SetupWithManager, err: %s", err)) - os.Exit(1) - } - - //+kubebuilder:scaffold:builder - - setupLog.Info("starting manager") - if err = mgr.Start(r.ctx); err != nil { - klog.Fatal(fmt.Sprintf("problem running manager: %s", err)) - os.Exit(1) - } -} - -func initDataSources(opts *controller.ResourceRecommenderConfig) *datasource.Proxy { - dataProxy := datasource.NewProxy() - for _, datasourceProvider := range opts.DataSource { - switch datasourceProvider { - case string(datasource.PrometheusDatasource): - fallthrough - default: - // default is prom - prometheusProvider, err := prometheus.NewPrometheus(&opts.DataSourcePromConfig) - if err != nil { - klog.Exitf("unable to create datasource provider %v, err: %v", prometheusProvider, err) - panic(err) - } - dataProxy.RegisterDatasource(datasource.PrometheusDatasource, prometheusProvider) - } - } - return dataProxy -} diff --git a/pkg/controller/resource-recommend/controller/controller_test.go b/pkg/controller/resource-recommend/controller/controller_test.go deleted file mode 100644 index bb8c81702..000000000 --- a/pkg/controller/resource-recommend/controller/controller_test.go +++ /dev/null @@ -1,89 +0,0 @@ -/* -Copyright 2022 The Katalyst Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "reflect" - "testing" - "time" - - "bou.ke/monkey" - promapiv1 "github.com/prometheus/client_golang/api/prometheus/v1" - "github.com/stretchr/testify/mock" - - "github.com/kubewharf/katalyst-core/pkg/config/controller" - "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource" - resourcerecommendprometheus "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource/prometheus" - "github.com/kubewharf/katalyst-core/pkg/util/datasource/prometheus" - datasourcetypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/datasource" -) - -type MockDatasource struct { - mock.Mock -} - -func (m *MockDatasource) ConvertMetricToQuery(metric datasourcetypes.Metric) (*datasourcetypes.Query, error) { - args := m.Called(metric) - return args.Get(0).(*datasourcetypes.Query), args.Error(1) -} - -func (m *MockDatasource) QueryTimeSeries(query *datasourcetypes.Query, start, end time.Time, step time.Duration) (*datasourcetypes.TimeSeries, error) { - args := m.Called(query, start, end, step) - return args.Get(0).(*datasourcetypes.TimeSeries), args.Error(1) -} - -func (m *MockDatasource) GetPromClient() promapiv1.API { - args := m.Called() - return args.Get(0).(promapiv1.API) -} - -func Test_initDataSources(t *testing.T) { - proxy := datasource.NewProxy() - mockDatasource := MockDatasource{} - proxy.RegisterDatasource(datasource.PrometheusDatasource, &mockDatasource) - type args struct { - opts *controller.ResourceRecommenderConfig - } - tests := []struct { - name string - args args - want *datasource.Proxy - }{ - { - name: "return_Datasource", - args: args{ - opts: &controller.ResourceRecommenderConfig{ - DataSource: []string{string(datasource.PrometheusDatasource)}, - }, - }, - want: proxy, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defer monkey.UnpatchAll() - - monkey.Patch(resourcerecommendprometheus.NewPrometheus, func(config *prometheus.PromConfig) (resourcerecommendprometheus.PromDatasource, error) { - return &mockDatasource, nil - }) - - if got := initDataSources(tt.args.opts); !reflect.DeepEqual(got, tt.want) { - t.Errorf("initDataSources() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/controller/resource-recommend/controller/oom_recorder_controller.go b/pkg/controller/resource-recommend/controller/oom_recorder_controller.go index 400cf9e7f..afa300e70 100644 --- a/pkg/controller/resource-recommend/controller/oom_recorder_controller.go +++ b/pkg/controller/resource-recommend/controller/oom_recorder_controller.go @@ -18,49 +18,113 @@ package controller import ( "context" + "fmt" - v1 "k8s.io/api/core/v1" + "github.com/pkg/errors" + core "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" + katalystbase "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/config/generic" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/oom" ) -// PodOOMRecorderController reconciles a PodOOMRecorder object +const OOMRecorderControllerName = "oomRecorder" + +// PodOOMRecorderController controls pod oom events recorder type PodOOMRecorderController struct { - *oom.PodOOMRecorder + ctx context.Context + syncedFunc []cache.InformerSynced + Recorder *oom.PodOOMRecorder +} + +// NewPodOOMRecorderController +func NewPodOOMRecorderController(ctx context.Context, + controlCtx *katalystbase.GenericContext, + genericConf *generic.GenericConfiguration, + _ *controller.GenericControllerConfiguration, + recConf *controller.ResourceRecommenderConfig, +) (*PodOOMRecorderController, error) { + if controlCtx == nil { + return nil, fmt.Errorf("controlCtx is invalid") + } + + podInformer := controlCtx.KubeInformerFactory.Core().V1().Pods() + + podOOMRecorderController := &PodOOMRecorderController{ + ctx: ctx, + syncedFunc: []cache.InformerSynced{ + podInformer.Informer().HasSynced, + }, + } + + podOOMRecorderController.Recorder = &oom.PodOOMRecorder{ + Client: controlCtx.Client.KubeClient.CoreV1(), + OOMRecordMaxNumber: recConf.OOMRecordMaxNumber, + Queue: workqueue.New(), + } + + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: podOOMRecorderController.addPod, + UpdateFunc: podOOMRecorderController.updatePod, + }) + + return podOOMRecorderController, nil +} + +func (oc *PodOOMRecorderController) Run() { + defer utilruntime.HandleCrash() + defer klog.Infof("[resource-recommend] shutting down %s controller", OOMRecorderControllerName) + + if !cache.WaitForCacheSync(oc.ctx.Done(), oc.syncedFunc...) { + utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s controller", OOMRecorderControllerName)) + return + } + + go func() { + defer func() { + if r := recover(); r != nil { + klog.Error(errors.Errorf("run oom recorder panic: %v", r.(error))) + } + }() + if err := oc.Recorder.Run(oc.ctx.Done()); err != nil { + klog.Warningf("run oom recorder failed: %v", err) + } + }() + + <-oc.ctx.Done() } -//+kubebuilder:rbac:groups=recommendation.katalyst.kubewharf.io,resources=podOOMRecorder,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=recommendation.katalyst.kubewharf.io,resources=podOOMRecorder/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=recommendation.katalyst.kubewharf.io,resources=podOOMRecorder/finalizers,verbs=update - -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// reconcile takes an pod resource.It records information when a Pod experiences an OOM (Out of Memory) state. -// the ResourceRecommend object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.2/pkg/reconcile -func (r *PodOOMRecorderController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - klog.V(5).InfoS("Get pods", "NamespacedName", req.NamespacedName) - pod := &v1.Pod{} - err := r.Client.Get(ctx, req.NamespacedName, pod) - if err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) +func (oc *PodOOMRecorderController) addPod(obj interface{}) { + v, ok := obj.(*core.Pod) + if !ok { + klog.Errorf("cannot convert obj to *core.Pod: %v", obj) } + oc.ProcessContainer(v) +} + +func (oc *PodOOMRecorderController) updatePod(oldObj, _ interface{}) { + v, ok := oldObj.(*core.Pod) + if !ok { + klog.Errorf("cannot convert obj to *core.Pod: %v", oldObj) + } + oc.ProcessContainer(v) +} + +// ProcessContainer checks for OOM kills in pod containers and enqueues them for processing. +func (oc *PodOOMRecorderController) ProcessContainer(pod *core.Pod) { for _, containerStatus := range pod.Status.ContainerStatuses { if containerStatus.RestartCount > 0 && containerStatus.LastTerminationState.Terminated != nil && containerStatus.LastTerminationState.Terminated.Reason == "OOMKilled" { if container := GetContainer(pod, containerStatus.Name); container != nil { - if memory, ok := container.Resources.Requests[v1.ResourceMemory]; ok { - r.Queue.Add(oom.OOMRecord{ + if memory, ok := container.Resources.Requests[core.ResourceMemory]; ok { + // 添加工作队列 + oc.Recorder.Queue.Add(oom.OOMRecord{ Namespace: pod.Namespace, Pod: pod.Name, Container: containerStatus.Name, @@ -73,21 +137,10 @@ func (r *PodOOMRecorderController) Reconcile(ctx context.Context, req ctrl.Reque } } } - return ctrl.Result{}, nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *PodOOMRecorderController) SetupWithManager(mgr ctrl.Manager) error { - r.Queue = workqueue.New() - return ctrl.NewControllerManagedBy(mgr). - WithOptions(controller.Options{ - RecoverPanic: true, - }). - For(&v1.Pod{}). - Complete(r) } -func GetContainer(pod *v1.Pod, containerName string) *v1.Container { +// GetContainer get container info from pod +func GetContainer(pod *core.Pod, containerName string) *core.Container { for i := range pod.Spec.Containers { if pod.Spec.Containers[i].Name == containerName { return &pod.Spec.Containers[i] diff --git a/pkg/controller/resource-recommend/controller/oom_recorder_controller_test.go b/pkg/controller/resource-recommend/controller/oom_recorder_controller_test.go index 662ee5fe4..e40e30ebd 100644 --- a/pkg/controller/resource-recommend/controller/oom_recorder_controller_test.go +++ b/pkg/controller/resource-recommend/controller/oom_recorder_controller_test.go @@ -17,64 +17,58 @@ limitations under the License. package controller import ( - "reflect" + "context" "testing" + "time" - v1 "k8s.io/api/core/v1" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/tools/cache" + cliflag "k8s.io/component-base/cli/flag" + + katalystbase "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/cmd/katalyst-controller/app/options" + "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/config/generic" ) -func TestGetContainer(t *testing.T) { - type args struct { - pod *v1.Pod - containerName string - } +func TestOOMRecorderController_Run(t *testing.T) { + t.Parallel() tests := []struct { name string - args args - want *v1.Container }{ { - name: "notFount", - args: args{ - pod: &v1.Pod{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "c1", - }, - }, - }, - }, - containerName: "cx", - }, - want: nil, - }, - { - name: "Got", - args: args{ - pod: &v1.Pod{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "c1", - Image: "image1", - }, - }, - }, - }, - containerName: "c1", - }, - want: &v1.Container{ - Name: "c1", - Image: "image1", - }, + name: "correct start", }, } + for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { - if got := GetContainer(tt.args.pod, tt.args.containerName); !reflect.DeepEqual(got, tt.want) { - t.Errorf("GetContainer() = %v, want %v", got, tt.want) + t.Parallel() + + ctx := context.TODO() + genericConf := &generic.GenericConfiguration{} + controllerConf := &controller.GenericControllerConfiguration{ + DynamicGVResources: []string{"deployment.v1.apps"}, } + + fss := &cliflag.NamedFlagSets{} + resourceRecommenderOptions := options.NewResourceRecommenderOptions() + resourceRecommenderOptions.AddFlags(fss) + resourceRecommenderConf := controller.NewResourceRecommenderConfig() + _ = resourceRecommenderOptions.ApplyTo(resourceRecommenderConf) + + controlCtx, err := katalystbase.GenerateFakeGenericContext(nil) + assert.NoError(t, err) + + oc, err := NewPodOOMRecorderController(ctx, controlCtx, genericConf, controllerConf, resourceRecommenderConf) + assert.NoError(t, err) + + controlCtx.StartInformer(ctx) + go oc.Run() + synced := cache.WaitForCacheSync(ctx.Done(), oc.syncedFunc...) + assert.True(t, synced) + time.Sleep(10 * time.Millisecond) }) } } diff --git a/pkg/controller/resource-recommend/controller/resourcerecommend_controller.go b/pkg/controller/resource-recommend/controller/resourcerecommend_controller.go index bf94d100e..dd36eb226 100644 --- a/pkg/controller/resource-recommend/controller/resourcerecommend_controller.go +++ b/pkg/controller/resource-recommend/controller/resourcerecommend_controller.go @@ -19,24 +19,34 @@ package controller import ( "context" "fmt" + "reflect" "runtime/debug" "time" + "github.com/pkg/errors" "golang.org/x/time/rate" k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" k8stypes "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + memory "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" + reclister "github.com/kubewharf/katalyst-api/pkg/client/listers/recommendation/v1alpha1" + katalystbase "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/client/control" + "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource" + "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource/prometheus" + "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/oom" processormanager "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor/manager" recommendermanager "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/recommender/manager" - resourceutils "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/resource" conditionstypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/conditions" errortypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/error" processortypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/processor" @@ -44,156 +54,345 @@ import ( ) const ( - ExponentialFailureRateLimiterBaseDelay = time.Minute - ExponentialFailureRateLimiterMaxDelay = 30 * time.Minute - DefaultRecommendInterval = 24 * time.Hour + resourceRecommendControllerName = "resourceRecommend" + exponentialFailureRateLimiterBaseDelay = time.Minute + exponentialFailureRateLimiterMaxDelay = 30 * time.Minute + defaultRecommendInterval = 24 * time.Hour ) -// ResourceRecommendController reconciles a ResourceRecommend object type ResourceRecommendController struct { - client.Client - Scheme *runtime.Scheme + ctx context.Context + conf *controller.ResourceRecommenderConfig + + recUpdater control.ResourceRecommendUpdater + + recLister reclister.ResourceRecommendLister + + recQueue workqueue.RateLimitingInterface + recSyncWorkers int + recSyncPeriod time.Duration + + syncedFunc []cache.InformerSynced + + client dynamic.Interface + restMapper *restmapper.DeferredDiscoveryRESTMapper + + OOMRecorder *oom.PodOOMRecorder ProcessorManager *processormanager.Manager RecommenderManager *recommendermanager.Manager } -//+kubebuilder:rbac:groups=recommendation.katalyst.kubewharf.io,resources=resourcerecommends,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=recommendation.katalyst.kubewharf.io,resources=resourcerecommends/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=recommendation.katalyst.kubewharf.io,resources=resourcerecommends/finalizers,verbs=update - -// SetupWithManager sets up the controller with the Manager. -func (r *ResourceRecommendController) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1.ResourceRecommend{}). - // We will only focus on the event of the Spec update, filter update events for status and meta - WithEventFilter(predicate.GenerationChangedPredicate{}). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 10, - RecoverPanic: true, - RateLimiter: workqueue.NewMaxOfRateLimiter( - // For reconcile failures(i.e. reconcile return err), the retry time is (2*minutes)*2^ +func NewResourceRecommendController(ctx context.Context, + controlCtx *katalystbase.GenericContext, + genericConf *generic.GenericConfiguration, + _ *controller.GenericControllerConfiguration, + recConf *controller.ResourceRecommenderConfig, + OOMRecorder *oom.PodOOMRecorder, +) (*ResourceRecommendController, error) { + if controlCtx == nil { + return nil, fmt.Errorf("controlCtx is invalid") + } + + recInformer := controlCtx.InternalInformerFactory.Recommendation().V1alpha1().ResourceRecommends() + + recController := &ResourceRecommendController{ + ctx: ctx, + + recUpdater: &control.DummyResourceRecommendUpdater{}, + recLister: recInformer.Lister(), + recQueue: workqueue.NewNamedRateLimitingQueue( + workqueue.NewMaxOfRateLimiter( + // For syncRec failures(i.e. doRecommend return err), the retry time is (2*minutes)*2^ // The maximum retry time is 24 hours - workqueue.NewItemExponentialFailureRateLimiter(ExponentialFailureRateLimiterBaseDelay, ExponentialFailureRateLimiterMaxDelay), - // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + workqueue.NewItemExponentialFailureRateLimiter(exponentialFailureRateLimiterBaseDelay, exponentialFailureRateLimiterMaxDelay), + // 10 qps, 100 bucket size. This is only for retry speed, it's only the overall factor (not per item) &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ), - }). - Complete(r) -} - -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// compare the state specified by the ResourceRecommend object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by the user. -// -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.2/pkg/reconcile -func (r *ResourceRecommendController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - klog.InfoS("Get resourceRecommend to reconcile", "req", req) - resourceRecommend := &v1alpha1.ResourceRecommend{} - err := r.Get(ctx, req.NamespacedName, resourceRecommend) + "resourceRecommend"), + recSyncWorkers: recConf.RecSyncWorkers, + recSyncPeriod: recConf.RecSyncPeriod, + + syncedFunc: []cache.InformerSynced{ + recInformer.Informer().HasSynced, + }, + + client: controlCtx.Client.DynamicClient, + restMapper: restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(controlCtx.Client.DiscoveryClient)), + } + + for _, wf := range controlCtx.DynamicResourcesManager.GetDynamicInformers() { + recController.syncedFunc = append(recController.syncedFunc, wf.Informer.Informer().HasSynced) + } + + recInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: recController.addRec, + UpdateFunc: recController.updateRec, + DeleteFunc: recController.deleteRec, + }) + + if !genericConf.DryRun { + recController.recUpdater = control.NewRealResourceRecommendUpdater(controlCtx.Client.InternalClient) + } + + // todo: add metricsEmitter + + dataProxy := initDataSources(recConf) + klog.Infof("[resource-recommend] successfully init data proxy %v", *dataProxy) + + recController.ProcessorManager = processormanager.NewManager(dataProxy, recController.recLister) + recController.OOMRecorder = OOMRecorder + recController.RecommenderManager = recommendermanager.NewManager(*recController.ProcessorManager, recController.OOMRecorder) + + return recController, nil +} + +func (rrc *ResourceRecommendController) Run() { + defer utilruntime.HandleCrash() + defer rrc.recQueue.ShutDown() + + defer klog.Infof("[resource-recommend] shutting down %s controller", resourceRecommendControllerName) + + if !cache.WaitForCacheSync(rrc.ctx.Done(), rrc.syncedFunc...) { + utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s controller", resourceRecommendControllerName)) + return + } + + // Start Processor + go func() { + defer func() { + if r := recover(); r != nil { + err := errors.Errorf("start processor panic: %v", r.(error)) + klog.Error(err) + panic(err) + } + }() + rrc.ProcessorManager.StartProcess(rrc.ctx) + }() + + klog.Infof("[resource-recommend] caches are synced for %s controller", resourceRecommendControllerName) + klog.Infof("[resource-recommend] start %v recSyncWorkers", rrc.recSyncWorkers) + + for i := 0; i < rrc.recSyncWorkers; i++ { + go wait.Until(rrc.recWorker, time.Second, rrc.ctx.Done()) + } + + <-rrc.ctx.Done() +} + +func (rrc *ResourceRecommendController) addRec(obj interface{}) { + v, ok := obj.(*v1alpha1.ResourceRecommend) + if !ok { + klog.Errorf("[resource-recommend] cannot convert obj to *apis.ResourceRecommend: %v", obj) + return + } + klog.V(4).Infof("[resource-recommend] notice addition of ResourceRecommend %s", v.Name) + rrc.enqueueRec(v) +} + +func (rrc *ResourceRecommendController) updateRec(oldObj, newObj interface{}) { + oldRec, oldOk := oldObj.(*v1alpha1.ResourceRecommend) + newRec, newOk := newObj.(*v1alpha1.ResourceRecommend) + if !oldOk || !newOk { + klog.Errorf("[resource-recommend] cannot convert obj to *apis.ResourceRecommend: %v", newObj) + return + } + + // Only enqueue when the spec changes, ignore status changes. + if !reflect.DeepEqual(oldRec.Spec, newRec.Spec) { + klog.V(4).Infof("[resource-recommend] notice spec update of ResourceRecommend %s", newRec.Name) + rrc.enqueueRec(newRec) + } +} + +func (rrc *ResourceRecommendController) deleteRec(obj interface{}) { + var rec *v1alpha1.ResourceRecommend = nil + switch t := obj.(type) { + case *v1alpha1.ResourceRecommend: + rec = t + case cache.DeletedFinalStateUnknown: + ok := false + rec, ok = t.Obj.(*v1alpha1.ResourceRecommend) + if !ok { + klog.ErrorS(nil, "[resource-recommend] cannot convert obj to *apis.ResourceRecommend: %v", "Obj", t) + return + } + default: + klog.ErrorS(nil, "Cannot convert to *v1alpha1.ResourceRecommend", "Obj", t) + return + } + klog.V(4).Infof("[resource-recommend] notice deletion of ResourceRecommend %s", rec.Name) + rrc.dequeueRec(rec) +} + +func (rrc *ResourceRecommendController) enqueueRec(rec *v1alpha1.ResourceRecommend) { + if rec == nil { + klog.Warningf("[resource-recommend] trying to enqueue a nil rec") + } + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(rec) + if err != nil { + utilruntime.HandleError(err) + return + } + rrc.recQueue.Add(key) +} + +func (rrc *ResourceRecommendController) dequeueRec(rec *v1alpha1.ResourceRecommend) { + if rec == nil { + klog.Warningf("[resource-recommend] trying to dequeue a nil rec") + } + + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(rec) + if err != nil { + utilruntime.HandleError(err) + return + } + rrc.recQueue.Forget(key) + rrc.recQueue.Done(key) +} + +// recWorker continuously processes tasks from the queue. +// Multiple workers can be configured to run concurrently via goroutines in the config, enhancing throughput. +func (rrc *ResourceRecommendController) recWorker() { + for rrc.ProcessNextResourceRecommend() { + } + klog.V(4).Infof("finish recworker") +} + +func (rrc *ResourceRecommendController) ProcessNextResourceRecommend() bool { + key, quit := rrc.recQueue.Get() + if quit { + return false + } + defer rrc.recQueue.Done(key) + + timeAfter, err := rrc.syncRec(key.(string)) + if err == nil { + rrc.recQueue.Forget(key) + rrc.recQueue.AddAfter(key, timeAfter) + return true + } + utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err)) + rrc.recQueue.AddRateLimited(key) + return true +} + +// syncRec processes resource recommendations. On success, it clears the queue item; on failure, it retries. +// Returns -1 upon error, otherwise the interval in seconds until the next execution for the same recommendation. +func (rrc *ResourceRecommendController) syncRec(key string) (time.Duration, error) { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + klog.Errorf("[resource-recommend] failed to split namespace and name from key %s", key) + return -1, err + } + + begin := time.Now() + defer func() { + costs := time.Since(begin).Microseconds() + klog.Infof("[resource-recommend] syncing resource [%v/%v] costs %v us", namespace, name, costs) + // todo: add metricsEmitter + }() + + resourceRecommend, err := rrc.recLister.ResourceRecommends(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - // Object not found, return - klog.V(2).InfoS("ResourceRecommend has been deleted.", "req", req) - // CancelTasks err dno‘t need to be processed, because the Processor side has gc logic - _ = r.CancelTasks(req.NamespacedName) - return ctrl.Result{}, nil + klog.Warningf("[resource-recommend] recommendation %s/%s is not found", namespace, name) + _ = rrc.CancelTasks(k8stypes.NamespacedName{ + Namespace: namespace, + Name: name, + }) + return rrc.recSyncPeriod, nil } - return ctrl.Result{}, err + return -1, err } - if recommender := resourceRecommend.Spec.ResourcePolicy.AlgorithmPolicy.Recommender; recommender != "" && recommender != recommendationtypes.DefaultRecommenderType { - klog.InfoS("ResourceRecommend is not controlled by the default controller", "req", req) - return ctrl.Result{}, nil + klog.InfoS("ResourceRecommend is not controlled by the default controller") + return rrc.recSyncPeriod, nil } if lastRecommendationTime := resourceRecommend.Status.LastRecommendationTime; lastRecommendationTime != nil { - requeueAfter := time.Until(lastRecommendationTime.Add(DefaultRecommendInterval)) + requeueAfter := time.Until(lastRecommendationTime.Add(defaultRecommendInterval)) observedGeneration := resourceRecommend.Status.ObservedGeneration if requeueAfter > time.Duration(0) && observedGeneration == resourceRecommend.GetGeneration() { - klog.InfoS("no spec change and not time to reconcile, skipping this reconcile", "requeueAfter", requeueAfter, "observedGeneration", observedGeneration, "generation", resourceRecommend.GetGeneration(), "resourceRecommendName", resourceRecommend.GetName()) - return ctrl.Result{RequeueAfter: requeueAfter}, nil + klog.InfoS("no spec change and not time to sync, skipping this sync", + "requeueAfter", requeueAfter, + "observedGeneration", observedGeneration, + "generation", resourceRecommend.GetGeneration(), + "resourceRecommendName", resourceRecommend.GetName()) + return requeueAfter, nil } } - err = r.doReconcile(ctx, req.NamespacedName, resourceRecommend) + err = rrc.doRecommend(rrc.ctx, k8stypes.NamespacedName{Namespace: namespace, Name: name}, resourceRecommend) if err != nil { - klog.ErrorS(err, "Failed to reconcile", "req", req) - return ctrl.Result{}, err + klog.ErrorS(err, "failed to sync recommendation ", namespace, "/", name) + return -1, err } - - klog.InfoS("reconcile succeeded, requeue after "+DefaultRecommendInterval.String(), "req", req) - // reconcile succeeded, requeue after 24hours - return ctrl.Result{RequeueAfter: DefaultRecommendInterval}, nil + return rrc.recSyncPeriod, nil } -func (r *ResourceRecommendController) doReconcile(ctx context.Context, namespacedName k8stypes.NamespacedName, +func (rrc *ResourceRecommendController) doRecommend(ctx context.Context, namespacedName k8stypes.NamespacedName, resourceRecommend *v1alpha1.ResourceRecommend, ) (err error) { recommendation := recommendationtypes.NewRecommendation(resourceRecommend) defer func() { if r := recover(); r != nil { err = fmt.Errorf("%v", r) - klog.ErrorS(err, "doReconcile panic", "resourceRecommend", namespacedName, "stack", string(debug.Stack())) + klog.ErrorS(err, "doRecommend panic", "resourceRecommend", namespacedName, "stack", string(debug.Stack())) return } - updateStatusErr := r.UpdateRecommendationStatus(namespacedName, recommendation) + updateStatusErr := rrc.UpdateRecommendationStatus(namespacedName, recommendation) if err == nil { err = updateStatusErr } }() - // conduct validation - validationError := recommendation.SetConfig(ctx, r.Client, resourceRecommend) + validationError := recommendation.SetConfig(ctx, rrc.client, resourceRecommend, rrc.restMapper) if validationError != nil { - klog.ErrorS(validationError, "Failed to get Recommendation", "resourceRecommend", namespacedName) + klog.ErrorS(validationError, "failed to get Recommendation", "resourceRecommend", namespacedName) recommendation.Conditions.Set(*conditionstypes.ConvertCustomErrorToCondition(*validationError)) return validationError } - // set the condition of the validation step to be true + recommendation.Conditions.Set(*conditionstypes.ValidationSucceededCondition()) - // Initialization - if registerTaskErr := r.RegisterTasks(*recommendation); registerTaskErr != nil { + if registerTaskErr := rrc.RegisterTasks(*recommendation); registerTaskErr != nil { klog.ErrorS(registerTaskErr, "Failed to register process task", "resourceRecommend", namespacedName) recommendation.Conditions.Set(*conditionstypes.ConvertCustomErrorToCondition(*registerTaskErr)) return registerTaskErr } - // set the condition of the initialization step to be true + recommendation.Conditions.Set(*conditionstypes.InitializationSucceededCondition()) - // recommendation logic - defaultRecommender := r.RecommenderManager.NewRecommender(recommendation.AlgorithmPolicy.Algorithm) + defaultRecommender := rrc.RecommenderManager.NewRecommender(recommendation.AlgorithmPolicy.Algorithm) if recommendationError := defaultRecommender.Recommend(recommendation); recommendationError != nil { klog.ErrorS(recommendationError, "error when getting recommendation for resource", "resourceRecommend", namespacedName) recommendation.Conditions.Set(*conditionstypes.ConvertCustomErrorToCondition(*recommendationError)) return recommendationError } - // set the condition of the recommendation step to be true + recommendation.Conditions.Set(*conditionstypes.RecommendationReadyCondition()) return nil } -// RegisterTasks Register all process task -func (r *ResourceRecommendController) RegisterTasks(recommendation recommendationtypes.Recommendation) *errortypes.CustomError { - processor := r.ProcessorManager.GetProcessor(recommendation.AlgorithmPolicy.Algorithm) +func (rrc *ResourceRecommendController) RegisterTasks(recommendation recommendationtypes.Recommendation) *errortypes.CustomError { + processor := rrc.ProcessorManager.GetProcessor(recommendation.AlgorithmPolicy.Algorithm) for _, container := range recommendation.Containers { for _, containerConfig := range container.ContainerConfigs { - processConfig := processortypes.NewProcessConfig(recommendation.NamespacedName, recommendation.Config.TargetRef, container.ContainerName, containerConfig.ControlledResource, "") + processConfig := processortypes.NewProcessConfig(recommendation.NamespacedName, + recommendation.Config.TargetRef, container.ContainerName, + containerConfig.ControlledResource, "") if err := processor.Register(processConfig); err != nil { return errortypes.DataProcessRegisteredFailedError(err.Error()) } } } - return nil } // CancelTasks Cancel all process task -func (r *ResourceRecommendController) CancelTasks(namespacedName k8stypes.NamespacedName) *errortypes.CustomError { - processor := r.ProcessorManager.GetProcessor(v1alpha1.AlgorithmPercentile) +func (rrc *ResourceRecommendController) CancelTasks(namespacedName k8stypes.NamespacedName) *errortypes.CustomError { + processor := rrc.ProcessorManager.GetProcessor(v1alpha1.AlgorithmPercentile) err := processor.Cancel(&processortypes.ProcessKey{ResourceRecommendNamespacedName: namespacedName}) if err != nil { klog.ErrorS(err, "cancel processor task failed", "namespacedName", namespacedName) @@ -201,18 +400,35 @@ func (r *ResourceRecommendController) CancelTasks(namespacedName k8stypes.Namesp return err } -func (r *ResourceRecommendController) UpdateRecommendationStatus(namespaceName k8stypes.NamespacedName, - recommendation *recommendationtypes.Recommendation, -) error { - updateStatus := &v1alpha1.ResourceRecommend{ - Status: recommendation.AsStatus(), +func (rrc *ResourceRecommendController) UpdateRecommendationStatus(namespacedName k8stypes.NamespacedName, recommendation *recommendationtypes.Recommendation) error { + oldRec, err := rrc.recLister.ResourceRecommends(namespacedName.Namespace).Get(namespacedName.Name) + if err != nil { + klog.ErrorS(err, "get old resourceRecommend failed") + return err } - // record generation - updateStatus.Status.ObservedGeneration = recommendation.ObservedGeneration - err := resourceutils.PatchUpdateResourceRecommend(r.Client, namespaceName, updateStatus) - if err != nil { - klog.ErrorS(err, "Update resourceRecommend status error") + newRec := oldRec.DeepCopy() + newRec.Status = recommendation.AsStatus() + newRec.Status.ObservedGeneration = recommendation.ObservedGeneration + + return rrc.recUpdater.PatchResourceRecommend(rrc.ctx, oldRec, newRec) +} + +func initDataSources(opts *controller.ResourceRecommenderConfig) *datasource.Proxy { + dataProxy := datasource.NewProxy() + for _, datasourceProvider := range opts.DataSource { + switch datasourceProvider { + case string(datasource.PrometheusDatasource): + fallthrough + default: + // default is prom + prometheusProvider, err := prometheus.NewPrometheus(&opts.DataSourcePromConfig) + if err != nil { + klog.Exitf("unable to create datasource provider %v, err: %v", prometheusProvider, err) + panic(err) + } + dataProxy.RegisterDatasource(datasource.PrometheusDatasource, prometheusProvider) + } } - return err + return dataProxy } diff --git a/pkg/controller/resource-recommend/controller/resourcerecommend_controller_test.go b/pkg/controller/resource-recommend/controller/resourcerecommend_controller_test.go index 306b7787d..03cc9708f 100644 --- a/pkg/controller/resource-recommend/controller/resourcerecommend_controller_test.go +++ b/pkg/controller/resource-recommend/controller/resourcerecommend_controller_test.go @@ -17,220 +17,68 @@ limitations under the License. package controller import ( - "context" - "encoding/json" - "errors" "testing" "time" - "bou.ke/monkey" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/bytedance/mockey" + promapiv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/mock" - "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" - processormanager "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor/manager" - resourceutils "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/resource" - conditionstypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/conditions" + "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource" + resourcerecommendprometheus "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource/prometheus" datasourcetypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/datasource" - errortypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/error" - "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/processor" - recommendationtypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/recommendation" ) -func TestResourceRecommendController_UpdateRecommendationStatus(t *testing.T) { - type args struct { - namespaceName types.NamespacedName - recommendation *recommendationtypes.Recommendation - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "test_ObservedGeneration", - args: args{ - recommendation: &recommendationtypes.Recommendation{ - Conditions: &conditionstypes.ResourceRecommendConditionsMap{ - v1alpha1.Validated: { - Type: v1alpha1.Validated, - Status: v1.ConditionTrue, - LastTransitionTime: metav1.NewTime(time.Date(2023, 3, 3, 3, 0, 0, 0, time.UTC)), - }, - v1alpha1.Initialized: { - Type: v1alpha1.Initialized, - Status: v1.ConditionFalse, - LastTransitionTime: metav1.NewTime(time.Date(2023, 4, 4, 4, 0, 0, 0, time.UTC)), - Reason: "reason4", - Message: "test msg4", - }, - }, - Recommendations: []v1alpha1.ContainerResources{ - { - ContainerName: "c1", - }, - }, - ObservedGeneration: 543123451423, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defer monkey.UnpatchAll() - - monkey.Patch(resourceutils.PatchUpdateResourceRecommend, func(client k8sclient.Client, namespaceName types.NamespacedName, - resourceRecommend *v1alpha1.ResourceRecommend, - ) error { - if resourceRecommend.Status.ObservedGeneration != tt.args.recommendation.ObservedGeneration { - return errors.New("ObservedGeneration not update") - } - return nil - }) - - r := &ResourceRecommendController{} - if err := r.UpdateRecommendationStatus(tt.args.namespaceName, tt.args.recommendation); (err != nil) != tt.wantErr { - t.Errorf("UpdateRecommendationStatus() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } +type MockDatasource struct { + mock.Mock } -var gotProcessConfigList []processor.ProcessConfig - -type mockProcessor struct { - algorithm v1alpha1.Algorithm - runMark string +func (m *MockDatasource) ConvertMetricToQuery(metric datasourcetypes.Metric) (*datasourcetypes.Query, error) { + args := m.Called(metric) + return args.Get(0).(*datasourcetypes.Query), args.Error(1) } -func (p *mockProcessor) Run(_ context.Context) { return } - -func (p *mockProcessor) Register(processConfig *processor.ProcessConfig) *errortypes.CustomError { - gotProcessConfigList = append(gotProcessConfigList, *processConfig) - return nil +func (m *MockDatasource) QueryTimeSeries(query *datasourcetypes.Query, start, end time.Time, step time.Duration) (*datasourcetypes.TimeSeries, error) { + args := m.Called(query, start, end, step) + return args.Get(0).(*datasourcetypes.TimeSeries), args.Error(1) } -func (p *mockProcessor) Cancel(_ *processor.ProcessKey) *errortypes.CustomError { return nil } - -func (p *mockProcessor) QueryProcessedValues(_ *processor.ProcessKey) (float64, error) { return 0, nil } +func (m *MockDatasource) GetPromClient() promapiv1.API { + args := m.Called() + return args.Get(0).(promapiv1.API) +} -func TestResourceRecommendController_RegisterTasks_AssignmentTest(t *testing.T) { - recommendation := recommendationtypes.Recommendation{ - NamespacedName: types.NamespacedName{ - Name: "rec1", - Namespace: "default", - }, - Config: recommendationtypes.Config{ - TargetRef: v1alpha1.CrossVersionObjectReference{ - Name: "demo", - Kind: "deployment", - APIVersion: "app/v1", - }, - Containers: []recommendationtypes.Container{ - { - ContainerName: "c1", - ContainerConfigs: []recommendationtypes.ContainerConfig{ - { - ControlledResource: v1.ResourceCPU, - ResourceBufferPercent: 34, - }, - { - ControlledResource: v1.ResourceMemory, - ResourceBufferPercent: 43, - }, - }, - }, - { - ContainerName: "c2", - ContainerConfigs: []recommendationtypes.ContainerConfig{ - { - ControlledResource: v1.ResourceMemory, - ResourceBufferPercent: 53, - }, - }, - }, - }, - }, +func Test_initDataSources(t *testing.T) { + proxy := datasource.NewProxy() + mockDatasource := MockDatasource{} + proxy.RegisterDatasource(datasource.PrometheusDatasource, &mockDatasource) + type args struct { + opts *controller.ResourceRecommenderConfig } - - wantProcessConfigList := []processor.ProcessConfig{ - { - ProcessKey: processor.ProcessKey{ - ResourceRecommendNamespacedName: types.NamespacedName{ - Name: "rec1", - Namespace: "default", - }, - Metric: &datasourcetypes.Metric{ - Namespace: "default", - WorkloadName: "demo", - Kind: "deployment", - APIVersion: "app/v1", - ContainerName: "c1", - Resource: v1.ResourceCPU, - }, - }, - }, - { - ProcessKey: processor.ProcessKey{ - ResourceRecommendNamespacedName: types.NamespacedName{ - Name: "rec1", - Namespace: "default", - }, - Metric: &datasourcetypes.Metric{ - Namespace: "default", - WorkloadName: "demo", - Kind: "deployment", - APIVersion: "app/v1", - ContainerName: "c1", - Resource: v1.ResourceMemory, - }, - }, - }, + tests := []struct { + name string + args args + want *datasource.Proxy + }{ { - ProcessKey: processor.ProcessKey{ - ResourceRecommendNamespacedName: types.NamespacedName{ - Name: "rec1", - Namespace: "default", - }, - Metric: &datasourcetypes.Metric{ - Namespace: "default", - WorkloadName: "demo", - Kind: "deployment", - APIVersion: "app/v1", - ContainerName: "c2", - Resource: v1.ResourceMemory, + name: "return_Datasource", + args: args{ + opts: &controller.ResourceRecommenderConfig{ + DataSource: []string{string(datasource.PrometheusDatasource)}, }, }, + want: proxy, }, } + for _, tt := range tests { + defer mockey.UnPatchAll() + mockey.PatchConvey(tt.name, t, func() { + mockey.Mock(resourcerecommendprometheus.NewPrometheus).Return(&mockDatasource, nil).Build() - t.Run("AssignmentTest", func(t *testing.T) { - defer monkey.UnpatchAll() - - processor1 := &mockProcessor{} - manager := &processormanager.Manager{} - manager.ProcessorRegister(v1alpha1.AlgorithmPercentile, processor1) - r := &ResourceRecommendController{ - ProcessorManager: manager, - } - - if err := r.RegisterTasks(recommendation); err != nil { - t.Errorf("RegisterTasks Assignment failed") - } - - got, err1 := json.Marshal(gotProcessConfigList) - if err1 != nil { - t.Errorf("RegisterTasks Assignment got json.Marshal failed") - } - want, err2 := json.Marshal(wantProcessConfigList) - if err2 != nil { - t.Errorf("RegisterTasks Assignment want json.Marshal failed") - } - - if string(got) != string(want) { - t.Errorf("RegisterTasks Assignment processConfig failed, got: %s, want: %s", string(got), string(want)) - } - }) + got := initDataSources(tt.args.opts) + convey.So(got, convey.ShouldResemble, tt.want) + }) + } } diff --git a/pkg/controller/resource-recommend/oom/oom_recorder.go b/pkg/controller/resource-recommend/oom/oom_recorder.go index 944800e31..ce76cc74a 100644 --- a/pkg/controller/resource-recommend/oom/oom_recorder.go +++ b/pkg/controller/resource-recommend/oom/oom_recorder.go @@ -24,12 +24,12 @@ import ( "time" "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -53,7 +53,7 @@ type OOMRecord struct { } type PodOOMRecorder struct { - client.Client + Client corev1.CoreV1Interface mu sync.Mutex @@ -141,13 +141,10 @@ func (r *PodOOMRecorder) updateOOMRecordConfigMap() error { if err != nil { return err } - oomConfigMap := &v1.ConfigMap{} - err = r.Client.Get(context.TODO(), types.NamespacedName{ - Namespace: ConfigMapOOMRecordNameSpace, - Name: ConfigMapOOMRecordName, - }, oomConfigMap) + oomConfigMap, err := r.Client.ConfigMaps(ConfigMapOOMRecordNameSpace). + Get(context.TODO(), ConfigMapOOMRecordName, metav1.GetOptions{ResourceVersion: "0"}) if err != nil { - if client.IgnoreNotFound(err) != nil { + if !apierrors.IsNotFound(err) { return err } oomConfigMap.Name = ConfigMapOOMRecordName @@ -155,12 +152,14 @@ func (r *PodOOMRecorder) updateOOMRecordConfigMap() error { oomConfigMap.Data = map[string]string{ ConfigMapDataOOMRecord: string(cacheData), } - return r.Client.Create(context.TODO(), oomConfigMap) + _, err = r.Client.ConfigMaps(ConfigMapOOMRecordNameSpace).Create(context.TODO(), oomConfigMap, metav1.CreateOptions{}) + return err } oomConfigMap.Data = map[string]string{ ConfigMapDataOOMRecord: string(cacheData), } - return r.Client.Update(context.TODO(), oomConfigMap) + _, err = r.Client.ConfigMaps(ConfigMapOOMRecordNameSpace).Update(context.TODO(), oomConfigMap, metav1.UpdateOptions{}) + return err } func (r *PodOOMRecorder) Run(stopCh <-chan struct{}) error { @@ -212,13 +211,14 @@ func (r *PodOOMRecorder) Run(stopCh <-chan struct{}) error { } func (r *PodOOMRecorder) ListOOMRecordsFromConfigmap() ([]OOMRecord, error) { - oomConfigMap := &v1.ConfigMap{} - err := r.Client.Get(context.TODO(), types.NamespacedName{ - Namespace: ConfigMapOOMRecordNameSpace, - Name: ConfigMapOOMRecordName, - }, oomConfigMap) + oomConfigMap, err := r.Client.ConfigMaps(ConfigMapOOMRecordNameSpace). + Get(context.TODO(), ConfigMapOOMRecordName, metav1.GetOptions{ResourceVersion: "0"}) if err != nil { - return nil, client.IgnoreNotFound(err) + // if ConfigMap cant be found, we return an empty list + if apierrors.IsNotFound(err) { + return []OOMRecord{}, nil + } + return nil, err } oomRecords := make([]OOMRecord, 0) err = json.Unmarshal([]byte(oomConfigMap.Data[ConfigMapDataOOMRecord]), &oomRecords) diff --git a/pkg/controller/resource-recommend/oom/oom_recorder_test.go b/pkg/controller/resource-recommend/oom/oom_recorder_test.go index 1e0085709..a4ad38f77 100644 --- a/pkg/controller/resource-recommend/oom/oom_recorder_test.go +++ b/pkg/controller/resource-recommend/oom/oom_recorder_test.go @@ -23,7 +23,8 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "sigs.k8s.io/controller-runtime/pkg/client/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sfake "k8s.io/client-go/kubernetes/fake" ) func TestCleanOOMRecord(t *testing.T) { @@ -166,7 +167,7 @@ func TestCleanOOMRecord(t *testing.T) { } func TestListOOMRecordsFromConfigmap(t *testing.T) { - dummyClient := fake.NewClientBuilder().WithObjects(&v1.ConfigMap{}).Build() + dummyClient := k8sfake.NewSimpleClientset().CoreV1() dummyPodOOMRecorder := PodOOMRecorder{ Client: dummyClient, } @@ -181,8 +182,11 @@ func TestListOOMRecordsFromConfigmap(t *testing.T) { } oomConfigMap.SetName(ConfigMapOOMRecordName) oomConfigMap.SetNamespace(ConfigMapOOMRecordNameSpace) - dummyPodOOMRecorder.Client.Create(context.TODO(), oomConfigMap) - oomRecords, err := dummyPodOOMRecorder.ListOOMRecordsFromConfigmap() + _, err := dummyPodOOMRecorder.Client.ConfigMaps(ConfigMapOOMRecordNameSpace).Create(context.TODO(), oomConfigMap, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create oom record: %v", err) + } + oomRecords, err = dummyPodOOMRecorder.ListOOMRecordsFromConfigmap() if err != nil || len(oomRecords) != 2 { t.Errorf("Expected oomRecords length is 2 and err is nil, but actual oomRecords length is %v and err is %v", len(oomRecords), err) } @@ -190,7 +194,7 @@ func TestListOOMRecordsFromConfigmap(t *testing.T) { func TestUpdateOOMRecordCache(t *testing.T) { now := time.Now() - dummyClient := fake.NewClientBuilder().WithObjects(&v1.ConfigMap{}).Build() + dummyClient := k8sfake.NewSimpleClientset().CoreV1() podOOMRecorderList := []PodOOMRecorder{ { Client: dummyClient, @@ -270,7 +274,7 @@ func TestUpdateOOMRecordCache(t *testing.T) { } func TestUpdateOOMRecordConfigMap(t *testing.T) { - dummyClient := fake.NewClientBuilder().WithObjects(&v1.ConfigMap{}).Build() + dummyClient := k8sfake.NewSimpleClientset().CoreV1() oomConfigMap := &v1.ConfigMap{ Data: map[string]string{ ConfigMapDataOOMRecord: `[]`, @@ -290,8 +294,12 @@ func TestUpdateOOMRecordConfigMap(t *testing.T) { }, }, } - err := dummyPodOOMRecorder.updateOOMRecordConfigMap() + _, err := dummyClient.ConfigMaps(ConfigMapOOMRecordNameSpace).Create(context.TODO(), oomConfigMap, metav1.CreateOptions{}) if err != nil { + t.Fatalf("Failed to create oom record: %v", err) + } + + if err := dummyPodOOMRecorder.updateOOMRecordConfigMap(); err != nil { t.Errorf("Expected the configMap was successfully created,but actually an error:%v occurred.", err) } oomRecordList, _ := dummyPodOOMRecorder.ListOOMRecordsFromConfigmap() @@ -301,9 +309,13 @@ func TestUpdateOOMRecordConfigMap(t *testing.T) { dummyPodOOMRecorder.cache[index], oomRecordList[index]) } } + if err := dummyPodOOMRecorder.Client.ConfigMaps(ConfigMapOOMRecordNameSpace).Delete(context.TODO(), oomConfigMap.GetName(), metav1.DeleteOptions{}); err != nil { + t.Errorf("Delete ConfigMaps meet error: %v", err) + } - dummyPodOOMRecorder.Client.DeleteAllOf(context.TODO(), oomConfigMap) - dummyPodOOMRecorder.Client.Create(context.TODO(), oomConfigMap) + if _, err := dummyPodOOMRecorder.Client.ConfigMaps(ConfigMapOOMRecordNameSpace).Create(context.TODO(), oomConfigMap, metav1.CreateOptions{}); err != nil { + t.Errorf("Create ConfigMaps meet error: %v", err) + } err = dummyPodOOMRecorder.updateOOMRecordConfigMap() if err != nil { t.Errorf("Expected the configMap was successfully updated,but actually an error:%v occurred.", err) diff --git a/pkg/controller/resource-recommend/processor/manager/processor_manager.go b/pkg/controller/resource-recommend/processor/manager/processor_manager.go index 259fbe794..aa332ae04 100644 --- a/pkg/controller/resource-recommend/processor/manager/processor_manager.go +++ b/pkg/controller/resource-recommend/processor/manager/processor_manager.go @@ -21,9 +21,8 @@ import ( "runtime/debug" "sync" - "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" + lister "github.com/kubewharf/katalyst-api/pkg/client/listers/recommendation/v1alpha1" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor/percentile" @@ -34,8 +33,8 @@ type Manager struct { processors map[v1alpha1.Algorithm]processor.Processor } -func NewManager(datasourceProxy *datasource.Proxy, c client.Client) *Manager { - percentileProcessor := percentile.NewProcessor(datasourceProxy, c) +func NewManager(datasourceProxy *datasource.Proxy, lister lister.ResourceRecommendLister) *Manager { + percentileProcessor := percentile.NewProcessor(datasourceProxy, lister) return &Manager{ processors: map[v1alpha1.Algorithm]processor.Processor{ v1alpha1.AlgorithmPercentile: percentileProcessor, diff --git a/pkg/controller/resource-recommend/processor/percentile/process_gc.go b/pkg/controller/resource-recommend/processor/percentile/process_gc.go index 293188944..4acfb5a9b 100644 --- a/pkg/controller/resource-recommend/processor/percentile/process_gc.go +++ b/pkg/controller/resource-recommend/processor/percentile/process_gc.go @@ -21,10 +21,9 @@ import ( "runtime/debug" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor/percentile/task" @@ -77,8 +76,7 @@ func (p *Processor) garbageCollect(ctx context.Context) error { }) // List all ResourceRecommend CR up to now - resourceRecommendList := &v1alpha1.ResourceRecommendList{} - err := p.Client.List(ctx, resourceRecommendList, &k8sclient.ListOptions{Raw: &metav1.ListOptions{ResourceVersion: "0"}}) + resourceRecommendItems, err := p.Lister.List(labels.Everything()) if err != nil { log.ErrorS(ctx, err, "garbage collect list all ResourceRecommend failed") return err @@ -86,10 +84,10 @@ func (p *Processor) garbageCollect(ctx context.Context) error { // p.ClearingNoAttributionTask(resourceRecommendList) klog.InfoS("percentile processor garbage collect list ResourceRecommend", - "ResourceRecommend Count", len(resourceRecommendList.Items)) + "ResourceRecommend Count", len(resourceRecommendItems)) // Convert the ResourceRecommend list to map for quick check whether it exists existResourceRecommends := make(map[types.NamespacedName]v1alpha1.CrossVersionObjectReference) - for _, existResourceRecommend := range resourceRecommendList.Items { + for _, existResourceRecommend := range resourceRecommendItems { namespacedName := types.NamespacedName{ Name: existResourceRecommend.Name, Namespace: existResourceRecommend.Namespace, diff --git a/pkg/controller/resource-recommend/processor/percentile/process_gc_test.go b/pkg/controller/resource-recommend/processor/percentile/process_gc_test.go index b5e2215f1..4cb0f942f 100644 --- a/pkg/controller/resource-recommend/processor/percentile/process_gc_test.go +++ b/pkg/controller/resource-recommend/processor/percentile/process_gc_test.go @@ -23,21 +23,20 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" - "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned/scheme" + katalystbase "github.com/kubewharf/katalyst-core/cmd/base" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor/percentile/task" datasourcetypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/datasource" processortypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/processor" ) func TestProcessor_garbageCollect(t *testing.T) { - s := scheme.Scheme - s.AddKnownTypes(v1alpha1.SchemeGroupVersion, &v1alpha1.ResourceRecommend{}) - client := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build() + controlCtx, err := katalystbase.GenerateFakeGenericContext() + if err != nil { + t.Fatal(err) + } processor := Processor{ - Client: client, + Lister: controlCtx.InternalInformerFactory.Recommendation().V1alpha1().ResourceRecommends().Lister(), AggregateTasks: &sync.Map{}, ResourceRecommendTaskIDsMap: make(map[types.NamespacedName]*map[datasourcetypes.Metric]processortypes.TaskID), } diff --git a/pkg/controller/resource-recommend/processor/percentile/processor.go b/pkg/controller/resource-recommend/processor/percentile/processor.go index aae9e879f..adf1bbaa8 100644 --- a/pkg/controller/resource-recommend/processor/percentile/processor.go +++ b/pkg/controller/resource-recommend/processor/percentile/processor.go @@ -27,8 +27,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/kubewharf/katalyst-api/pkg/client/listers/recommendation/v1alpha1" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/datasource" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor" "github.com/kubewharf/katalyst-core/pkg/controller/resource-recommend/processor/percentile/task" @@ -52,7 +52,7 @@ const ( type Processor struct { mutex sync.Mutex - client.Client + Lister v1alpha1.ResourceRecommendLister DatasourceProxy *datasource.Proxy @@ -70,11 +70,11 @@ var DefaultQueueRateLimiter = workqueue.NewMaxOfRateLimiter( &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) -func NewProcessor(datasourceProxy *datasource.Proxy, c client.Client) processor.Processor { +func NewProcessor(datasourceProxy *datasource.Proxy, lister v1alpha1.ResourceRecommendLister) processor.Processor { return &Processor{ DatasourceProxy: datasourceProxy, TaskQueue: workqueue.NewNamedRateLimitingQueue(DefaultQueueRateLimiter, ProcessorName), - Client: c, + Lister: lister, AggregateTasks: &sync.Map{}, ResourceRecommendTaskIDsMap: make(map[types.NamespacedName]*map[datasourcetypes.Metric]processortypes.TaskID), } diff --git a/pkg/util/resource-recommend/resource/k8s_resource.go b/pkg/util/resource-recommend/resource/k8s_resource.go index c16f45eb7..645925fd5 100644 --- a/pkg/util/resource-recommend/resource/k8s_resource.go +++ b/pkg/util/resource-recommend/resource/k8s_resource.go @@ -18,32 +18,43 @@ package resource import ( "context" - "encoding/json" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/restmapper" "k8s.io/klog/v2" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" ) -func ConvertAndGetResource(ctx context.Context, client k8sclient.Client, namespace string, targetRef v1alpha1.CrossVersionObjectReference) (*unstructured.Unstructured, error) { - klog.V(5).Infof("Get resource in", "targetRef", targetRef, "namespace", namespace) - obj := &unstructured.Unstructured{} - obj.SetAPIVersion(targetRef.APIVersion) - obj.SetKind(targetRef.Kind) - if err := client.Get(ctx, k8stypes.NamespacedName{Namespace: namespace, Name: targetRef.Name}, obj); err != nil { +func ConvertAndGetResource(ctx context.Context, client dynamic.Interface, + namespace string, targetRef v1alpha1.CrossVersionObjectReference, + mapper *restmapper.DeferredDiscoveryRESTMapper, +) (*unstructured.Unstructured, error) { + klog.V(5).Infof("get resource in targetRef: %v, namespace: %v", targetRef, namespace) + gvk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind) + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, err + } + + resource, err := client.Resource(schema.GroupVersionResource{ + Group: gvk.Group, + Version: gvk.Version, + Resource: mapping.Resource.Resource, + }).Namespace(namespace).Get(ctx, targetRef.Name, metav1.GetOptions{ResourceVersion: "0"}) + if err != nil { return nil, err } - return obj, nil + return resource, nil } -func GetAllClaimedContainers(controller *unstructured.Unstructured) ([]string, error) { - klog.V(5).InfoS("Get all controller claimed containers", "controller", controller) - templateSpec, found, err := unstructured.NestedMap(controller.Object, "spec", "template", "spec") +func GetAllClaimedContainers(resource *unstructured.Unstructured) ([]string, error) { + klog.V(5).InfoS("Get all controller claimed containers", "resource", resource) + templateSpec, found, err := unstructured.NestedMap(resource.Object, "spec", "template", "spec") if err != nil { return nil, errors.Wrapf(err, "unstructured.NestedMap err") } @@ -74,36 +85,3 @@ func GetAllClaimedContainers(controller *unstructured.Unstructured) ([]string, e } return containerNames, nil } - -type patchRecord struct { - Op string `json:"op,inline"` - Path string `json:"path,inline"` - Value interface{} `json:"value"` -} - -func PatchUpdateResourceRecommend(client k8sclient.Client, namespaceName k8stypes.NamespacedName, - resourceRecommend *v1alpha1.ResourceRecommend, -) error { - obj := &v1alpha1.ResourceRecommend{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespaceName.Name, - Namespace: namespaceName.Namespace, - }, - } - patches := []patchRecord{{ - Op: "replace", - Path: "/status", - Value: resourceRecommend.Status, - }} - - patch, err := json.Marshal(patches) - if err != nil { - return errors.Wrapf(err, "failed to Marshal resourceRecommend: %+v", resourceRecommend) - } - patchDate := k8sclient.RawPatch(k8stypes.JSONPatchType, patch) - err = client.Status().Patch(context.TODO(), obj, patchDate) - if err != nil { - return errors.Wrapf(err, "failed to patch resource") - } - return nil -} diff --git a/pkg/util/resource-recommend/resource/k8s_resource_test.go b/pkg/util/resource-recommend/resource/k8s_resource_test.go index 2fb50b55b..e71d87e4b 100644 --- a/pkg/util/resource-recommend/resource/k8s_resource_test.go +++ b/pkg/util/resource-recommend/resource/k8s_resource_test.go @@ -21,12 +21,10 @@ import ( "reflect" "testing" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + dynamicfake "k8s.io/client-go/dynamic/fake" "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" ) @@ -34,7 +32,7 @@ import ( func TestConvertAndGetResource(t *testing.T) { type args struct { ctx context.Context - client client.Client + client dynamic.Interface namespace string targetRef v1alpha1.CrossVersionObjectReference } @@ -55,7 +53,7 @@ func TestConvertAndGetResource(t *testing.T) { name: "Get resource failed", args: args{ ctx: context.TODO(), - client: fake.NewClientBuilder().Build(), + client: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), namespace: "fakeNamespace", targetRef: v1alpha1.CrossVersionObjectReference{ Kind: "Deployment", @@ -75,7 +73,7 @@ func TestConvertAndGetResource(t *testing.T) { name: "Get resource failed", args: args{ ctx: context.TODO(), - client: fake.NewClientBuilder().Build(), + client: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), namespace: "fakeNamespace", targetRef: v1alpha1.CrossVersionObjectReference{ Kind: "Deployment", @@ -94,8 +92,10 @@ func TestConvertAndGetResource(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - CreateMockUnstructured(nil, nil, tt.env.name, tt.env.namespace, tt.env.apiVersion, tt.env.kind, tt.args.client) - _, err := ConvertAndGetResource(tt.args.ctx, tt.args.client, tt.args.namespace, tt.args.targetRef) + if err := CreateMockUnstructured(nil, nil, tt.env.name, tt.env.namespace, tt.env.apiVersion, tt.env.kind, tt.args.client); err != nil { + t.Errorf("CreateMockUnstructured() gotErr = %v", err) + } + _, err := ConvertAndGetResource(tt.args.ctx, tt.args.client, tt.args.namespace, tt.args.targetRef, CreateMockRESTMapper()) if (err != nil) != tt.wantErr { t.Errorf("ConvertAndGetResource() error = %v, wantErr %v", err, tt.wantErr) return @@ -216,226 +216,3 @@ func TestGetAllClaimedContainers(t *testing.T) { }) } } - -func TestPatchUpdateResourceRecommend(t *testing.T) { - type args struct { - client client.Client - namespaceName types.NamespacedName - resourceRecommend *v1alpha1.ResourceRecommend - } - tests := []struct { - name string - args args - wantErr bool - wantResourceRecommend *v1alpha1.ResourceRecommend - }{ - { - name: "all right 1", - args: args{ - client: fake.NewClientBuilder().Build(), - namespaceName: types.NamespacedName{ - Name: "mockName", - Namespace: "mockNamespace", - }, - resourceRecommend: &v1alpha1.ResourceRecommend{ - TypeMeta: metav1.TypeMeta{ - Kind: "ResourceRecommend", - APIVersion: "autoscaling.katalyst.kubewharf.io/v1alpha1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "mockName", - Namespace: "mockNamespace", - }, - Status: v1alpha1.ResourceRecommendStatus{}, - }, - }, - wantResourceRecommend: &v1alpha1.ResourceRecommend{ - Status: v1alpha1.ResourceRecommendStatus{ - RecommendResources: &v1alpha1.RecommendResources{ - ContainerRecommendations: []v1alpha1.ContainerResources{ - { - ContainerName: "ContainerName1", - Requests: &v1alpha1.ContainerResourceList{}, - }, - { - ContainerName: "ContainerName2", - Requests: &v1alpha1.ContainerResourceList{}, - }, - { - ContainerName: "ContainerName3", - Requests: &v1alpha1.ContainerResourceList{}, - }, - { - ContainerName: "ContainerName4", - Requests: &v1alpha1.ContainerResourceList{}, - }, - }, - }, - Conditions: []v1alpha1.ResourceRecommendCondition{ - { - Type: v1alpha1.Validated, - Reason: "reason1", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason2", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason3", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason4", - Status: v1.ConditionTrue, - Message: "Message", - }, - }, - }, - }, - wantErr: false, - }, - { - name: "all right 2", - args: args{ - client: fake.NewClientBuilder().Build(), - namespaceName: types.NamespacedName{ - Name: "mockName", - Namespace: "mockNamespace", - }, - resourceRecommend: &v1alpha1.ResourceRecommend{ - TypeMeta: metav1.TypeMeta{ - Kind: "ResourceRecommend", - APIVersion: "autoscaling.katalyst.kubewharf.io/v1alpha1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "mockName", - Namespace: "mockNamespace", - }, - Status: v1alpha1.ResourceRecommendStatus{ - RecommendResources: &v1alpha1.RecommendResources{ - ContainerRecommendations: []v1alpha1.ContainerResources{ - { - ContainerName: "ContainerName0", - }, - { - ContainerName: "ContainerName1", - }, - { - ContainerName: "ContainerName2", - }, - { - ContainerName: "ContainerName3", - }, - }, - }, - Conditions: []v1alpha1.ResourceRecommendCondition{ - { - Type: v1alpha1.Validated, - Reason: "reason0", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason1", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason2", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason3", - Status: v1.ConditionTrue, - Message: "Message", - }, - }, - }, - }, - }, - wantResourceRecommend: &v1alpha1.ResourceRecommend{ - Status: v1alpha1.ResourceRecommendStatus{ - RecommendResources: &v1alpha1.RecommendResources{ - ContainerRecommendations: []v1alpha1.ContainerResources{ - { - ContainerName: "ContainerName1", - Requests: &v1alpha1.ContainerResourceList{}, - }, - { - ContainerName: "ContainerName2", - Requests: &v1alpha1.ContainerResourceList{}, - }, - { - ContainerName: "ContainerName3", - Requests: &v1alpha1.ContainerResourceList{}, - }, - { - ContainerName: "ContainerName4", - Requests: &v1alpha1.ContainerResourceList{}, - }, - }, - }, - Conditions: []v1alpha1.ResourceRecommendCondition{ - { - Type: v1alpha1.Validated, - Reason: "reason1", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason2", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason3", - Status: v1.ConditionTrue, - Message: "Message", - }, - { - Type: v1alpha1.Validated, - Reason: "reason4", - Status: v1.ConditionTrue, - Message: "Message", - }, - }, - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - v1alpha1.AddToScheme(tt.args.client.Scheme()) - tt.args.client.Create(context.TODO(), tt.args.resourceRecommend) - if err := PatchUpdateResourceRecommend(tt.args.client, tt.args.namespaceName, tt.args.resourceRecommend); (err != nil) != tt.wantErr { - t.Errorf("PatchUpdateResourceRecommend() error = %v, wantErr %v", err, tt.wantErr) - } - if err := PatchUpdateResourceRecommend(tt.args.client, tt.args.namespaceName, tt.wantResourceRecommend); (err != nil) != tt.wantErr { - t.Errorf("PatchUpdateResourceRecommend() error = %v, wantErr %v", err, tt.wantErr) - } - gotResourceRecommend := &v1alpha1.ResourceRecommend{} - tt.args.client.Get(context.TODO(), client.ObjectKey{ - Name: tt.args.namespaceName.Name, - Namespace: tt.args.namespaceName.Namespace, - }, gotResourceRecommend) - if !reflect.DeepEqual(gotResourceRecommend.Status, tt.wantResourceRecommend.Status) { - t.Errorf("PatchUpdateResourceRecommend() gotResourceRecommend.Status = %v, wantResourceRecommend.Status = %v", - gotResourceRecommend.Status, tt.wantResourceRecommend.Status) - } - }) - } -} diff --git a/pkg/util/resource-recommend/resource/k8s_resource_test_util.go b/pkg/util/resource-recommend/resource/k8s_resource_test_util.go index 57e4a90c5..545f618a8 100644 --- a/pkg/util/resource-recommend/resource/k8s_resource_test_util.go +++ b/pkg/util/resource-recommend/resource/k8s_resource_test_util.go @@ -20,11 +20,19 @@ import ( "context" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/apimachinery/pkg/runtime/schema" + memory "k8s.io/client-go/discovery/cached" + discoveryfake "k8s.io/client-go/discovery/fake" + "k8s.io/client-go/dynamic" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/restmapper" + clientTesting "k8s.io/client-go/testing" ) -func CreateMockPod(labels, annotations map[string]string, name, namespace, nodeName string, containers []v1.Container, client client.Client) { +func CreateMockPod(labels, annotations map[string]string, name, namespace, nodeName string, containers []v1.Container, client corev1.CoreV1Interface) error { pod := &v1.Pod{ Spec: v1.PodSpec{ NodeName: nodeName, @@ -36,10 +44,11 @@ func CreateMockPod(labels, annotations map[string]string, name, namespace, nodeN pod.SetName(name) pod.SetNamespace(namespace) - client.Create(context.TODO(), pod) + _, err := client.Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + return err } -func CreateMockUnstructured(matchLabels, unstructuredTemplateSpec map[string]interface{}, name, namespace, apiVersion, kind string, client client.Client) { +func CreateMockUnstructured(matchLabels, unstructuredTemplateSpec map[string]interface{}, name, namespace, apiVersion, kind string, client dynamic.Interface) error { collectorObject := &unstructured.Unstructured{} collectorObject.SetName(name) collectorObject.SetNamespace(namespace) @@ -47,5 +56,26 @@ func CreateMockUnstructured(matchLabels, unstructuredTemplateSpec map[string]int collectorObject.SetAPIVersion(apiVersion) unstructured.SetNestedMap(collectorObject.Object, matchLabels, "spec", "selector", "matchLabels") unstructured.SetNestedMap(collectorObject.Object, unstructuredTemplateSpec, "spec", "template", "spec") - client.Create(context.TODO(), collectorObject) + + gvk := schema.FromAPIVersionAndKind(apiVersion, kind) + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + + _, err := client.Resource(gvr).Namespace(namespace).Create(context.TODO(), collectorObject, metav1.CreateOptions{}) + return err +} + +func CreateMockRESTMapper() *restmapper.DeferredDiscoveryRESTMapper { + fakeDiscoveryClient := &discoveryfake.FakeDiscovery{Fake: &clientTesting.Fake{}} + fakeDiscoveryClient.Resources = []*metav1.APIResourceList{ + { + GroupVersion: "apps/v1", + APIResources: []metav1.APIResource{ + {Name: "pods", SingularName: "pod", Namespaced: true, Kind: "Pod"}, + {Name: "deployments", SingularName: "deployment", Namespaced: true, Kind: "Deployment"}, + {Name: "kinds", SingularName: "kind", Namespaced: true, Kind: "Kind"}, + }, + }, + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(fakeDiscoveryClient)) + return restMapper } diff --git a/pkg/util/resource-recommend/types/conditions/conditions_test.go b/pkg/util/resource-recommend/types/conditions/conditions_test.go index 4bcd47ed7..7a0dc8d0a 100644 --- a/pkg/util/resource-recommend/types/conditions/conditions_test.go +++ b/pkg/util/resource-recommend/types/conditions/conditions_test.go @@ -21,7 +21,8 @@ import ( "testing" "time" - "bou.ke/monkey" + "github.com/bytedance/mockey" + "github.com/smartystreets/goconvey/convey" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -173,16 +174,12 @@ func TestResourceRecommendConditionsMap_Set(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defer monkey.UnpatchAll() - - monkey.Patch(time.Now, func() time.Time { return tt.args.fakeTime }) + mockey.PatchConvey(tt.name, t, func() { + defer mockey.UnPatchAll() + mockey.Mock(time.Now).Return(tt.args.fakeTime).Build() tt.conditionsMap.Set(tt.args.condition) - - if !reflect.DeepEqual(tt.conditionsMap, tt.want) { - t.Errorf("conditionsMap set failed, got: %v, want: %v", tt.conditionsMap, tt.want) - } + convey.So(tt.conditionsMap, convey.ShouldResemble, tt.want) }) } } diff --git a/pkg/util/resource-recommend/types/recommendation/recommendation.go b/pkg/util/resource-recommend/types/recommendation/recommendation.go index f5ed229a4..7ae3bb4e8 100644 --- a/pkg/util/resource-recommend/types/recommendation/recommendation.go +++ b/pkg/util/resource-recommend/types/recommendation/recommendation.go @@ -22,8 +22,9 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/restmapper" "k8s.io/klog/v2" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" conditionstypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/conditions" @@ -96,8 +97,8 @@ func NewRecommendation(resourceRecommend *v1alpha1.ResourceRecommend) *Recommend } } -func (r *Recommendation) SetConfig(ctx context.Context, client k8sclient.Client, - resourceRecommend *v1alpha1.ResourceRecommend, +func (r *Recommendation) SetConfig(ctx context.Context, client dynamic.Interface, + resourceRecommend *v1alpha1.ResourceRecommend, mapper *restmapper.DeferredDiscoveryRESTMapper, ) *errortypes.CustomError { targetRef, customErr := ValidateAndExtractTargetRef(resourceRecommend.Spec.TargetRef) if customErr != nil { @@ -113,7 +114,8 @@ func (r *Recommendation) SetConfig(ctx context.Context, client k8sclient.Client, return customErr } - containers, customErr := ValidateAndExtractContainers(ctx, client, resourceRecommend.Namespace, targetRef, resourceRecommend.Spec.ResourcePolicy.ContainerPolicies) + containers, customErr := ValidateAndExtractContainers(ctx, client, resourceRecommend.Namespace, + targetRef, resourceRecommend.Spec.ResourcePolicy.ContainerPolicies, mapper) if customErr != nil { klog.Errorf("spec.resourcePolicy.containerPolicies validate error, "+ "reason: %s, msg: %s", customErr.Code, customErr.Message) diff --git a/pkg/util/resource-recommend/types/recommendation/recommendation_test.go b/pkg/util/resource-recommend/types/recommendation/recommendation_test.go index 875c66de3..6faac86f8 100644 --- a/pkg/util/resource-recommend/types/recommendation/recommendation_test.go +++ b/pkg/util/resource-recommend/types/recommendation/recommendation_test.go @@ -22,13 +22,15 @@ import ( "testing" "time" - "bou.ke/monkey" + "github.com/bytedance/mockey" + "github.com/smartystreets/goconvey/convey" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" + "k8s.io/apimachinery/pkg/runtime" + dynamicfake "k8s.io/client-go/dynamic/fake" "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" + resourceutils "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/resource" conditionstypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/conditions" errortypes "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/types/error" ) @@ -125,12 +127,10 @@ func TestRecommendation_AsStatus(t *testing.T) { }, }, } + + mockey.Mock(time.Now).Return(fakeTime1).Build() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - defer monkey.UnpatchAll() - - monkey.Patch(time.Now, func() time.Time { return fakeTime1 }) - if got := tt.recommendation.AsStatus(); !reflect.DeepEqual(got, tt.want) { t.Errorf("AsStatus() = %v, want %v", got, tt.want) } @@ -216,40 +216,24 @@ func TestRecommendation_SetConfig(t *testing.T) { wantErr: nil, }, } + defer mockey.UnPatchAll() for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defer monkey.UnpatchAll() - - monkey.Patch(ValidateAndExtractTargetRef, func(targetRefReq v1alpha1.CrossVersionObjectReference) ( - v1alpha1.CrossVersionObjectReference, *errortypes.CustomError, - ) { - return tt.args.targetRef, tt.args.customErr1 - }) - monkey.Patch(ValidateAndExtractAlgorithmPolicy, func(algorithmPolicyReq v1alpha1.AlgorithmPolicy) ( - v1alpha1.AlgorithmPolicy, *errortypes.CustomError, - ) { - return tt.args.algorithmPolicy, tt.args.customErr2 - }) - monkey.Patch(ValidateAndExtractContainers, func(ctx context.Context, client k8sclient.Client, namespace string, - targetRef v1alpha1.CrossVersionObjectReference, - containerPolicies []v1alpha1.ContainerResourcePolicy, - ) ([]Container, *errortypes.CustomError) { - return tt.args.containers, tt.args.customErr3 - }) + mockey.PatchConvey(tt.name, t, func() { + mockey.Mock(ValidateAndExtractTargetRef).Return(tt.args.targetRef, tt.args.customErr1).Build() + mockey.Mock(ValidateAndExtractAlgorithmPolicy).Return(tt.args.algorithmPolicy, tt.args.customErr2).Build() + mockey.Mock(ValidateAndExtractContainers).Return(tt.args.containers, tt.args.customErr3).Build() r := NewRecommendation(&v1alpha1.ResourceRecommend{}) - if gotErr := r.SetConfig(context.Background(), fake.NewClientBuilder().Build(), &v1alpha1.ResourceRecommend{}); !reflect.DeepEqual(gotErr, tt.wantErr) { - t.Errorf("SetConfig() = %v, want %v", gotErr, tt.wantErr) - } + dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()) + gotErr := r.SetConfig(context.Background(), dynamicClient, &v1alpha1.ResourceRecommend{}, resourceutils.CreateMockRESTMapper()) + convey.So(gotErr, convey.ShouldResemble, tt.wantErr) if tt.wantErr == nil { config := Config{ TargetRef: tt.args.targetRef, AlgorithmPolicy: tt.args.algorithmPolicy, Containers: tt.args.containers, } - if !reflect.DeepEqual(config, r.Config) { - t.Errorf("SetConfig() failed, want config: %v, got: %v", config, r.Config) - } + convey.So(r.Config, convey.ShouldResemble, config) } }) } diff --git a/pkg/util/resource-recommend/types/recommendation/validate.go b/pkg/util/resource-recommend/types/recommendation/validate.go index 69187920f..56926efb3 100644 --- a/pkg/util/resource-recommend/types/recommendation/validate.go +++ b/pkg/util/resource-recommend/types/recommendation/validate.go @@ -19,6 +19,8 @@ package recommendation import ( "context" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/restmapper" "k8s.io/klog/v2" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -64,16 +66,17 @@ func ValidateAndExtractAlgorithmPolicy(algorithmPolicyReq v1alpha1.AlgorithmPoli return algorithmPolicy, nil } -func ValidateAndExtractContainers(ctx context.Context, client k8sclient.Client, namespace string, +func ValidateAndExtractContainers(ctx context.Context, client dynamic.Interface, namespace string, targetRef v1alpha1.CrossVersionObjectReference, - containerPolicies []v1alpha1.ContainerResourcePolicy) ( + containerPolicies []v1alpha1.ContainerResourcePolicy, + mapper *restmapper.DeferredDiscoveryRESTMapper) ( []Container, *errortypes.CustomError, ) { if len(containerPolicies) == 0 { return nil, errortypes.ContainerPoliciesNotFoundError() } - resource, err := resourceutils.ConvertAndGetResource(ctx, client, namespace, targetRef) + resource, err := resourceutils.ConvertAndGetResource(ctx, client, namespace, targetRef, mapper) if err != nil { klog.ErrorS(err, "ConvertAndGetResource err") if k8sclient.IgnoreNotFound(err) == nil { diff --git a/pkg/util/resource-recommend/types/recommendation/validate_test.go b/pkg/util/resource-recommend/types/recommendation/validate_test.go index c5a0ad39d..646ceeddc 100644 --- a/pkg/util/resource-recommend/types/recommendation/validate_test.go +++ b/pkg/util/resource-recommend/types/recommendation/validate_test.go @@ -23,8 +23,10 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" + "k8s.io/client-go/dynamic" + dynamicfake "k8s.io/client-go/dynamic/fake" + k8sfake "k8s.io/client-go/kubernetes/fake" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "github.com/kubewharf/katalyst-api/pkg/apis/recommendation/v1alpha1" resourceutils "github.com/kubewharf/katalyst-core/pkg/util/resource-recommend/resource" @@ -114,7 +116,8 @@ func TestValidateAndExtractAlgorithmPolicy(t *testing.T) { func TestValidateAndExtractContainers(t *testing.T) { type args struct { ctx context.Context - client client.Client + dynamicClient dynamic.Interface + coreClient corev1.CoreV1Interface namespace string targetRef v1alpha1.CrossVersionObjectReference containerPolicies []v1alpha1.ContainerResourcePolicy @@ -142,13 +145,14 @@ func TestValidateAndExtractContainers(t *testing.T) { { name: errortypes.WorkloadNotFoundMessage, args: args{ - ctx: context.TODO(), - client: fake.NewClientBuilder().Build(), - namespace: "mockNamespace", + ctx: context.TODO(), + dynamicClient: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), + coreClient: k8sfake.NewSimpleClientset().CoreV1(), + namespace: "mockNamespace", targetRef: v1alpha1.CrossVersionObjectReference{ - Kind: "kind", + Kind: "Kind", Name: "Name", - APIVersion: "version", + APIVersion: "apps/v1", }, containerPolicies: []v1alpha1.ContainerResourcePolicy{ { @@ -162,9 +166,10 @@ func TestValidateAndExtractContainers(t *testing.T) { { name: errortypes.WorkloadMatchedErrorMessage, args: args{ - ctx: context.TODO(), - client: fake.NewClientBuilder().Build(), - namespace: "mockNamespace", + ctx: context.TODO(), + dynamicClient: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), + coreClient: k8sfake.NewSimpleClientset().CoreV1(), + namespace: "mockNamespace", targetRef: v1alpha1.CrossVersionObjectReference{ Kind: "", Name: "", @@ -187,9 +192,10 @@ func TestValidateAndExtractContainers(t *testing.T) { { name: "all right", args: args{ - ctx: context.TODO(), - client: fake.NewClientBuilder().Build(), - namespace: "mockNamespace", + ctx: context.TODO(), + dynamicClient: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), + coreClient: k8sfake.NewSimpleClientset().CoreV1(), + namespace: "mockNamespace", targetRef: v1alpha1.CrossVersionObjectReference{ Name: "mockName5", Kind: "Deployment", @@ -264,9 +270,10 @@ func TestValidateAndExtractContainers(t *testing.T) { { name: "controlled resources is empty", args: args{ - ctx: context.TODO(), - client: fake.NewClientBuilder().Build(), - namespace: "mockNamespace", + ctx: context.TODO(), + dynamicClient: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), + coreClient: k8sfake.NewSimpleClientset().CoreV1(), + namespace: "mockNamespace", targetRef: v1alpha1.CrossVersionObjectReference{ Name: "mockName5", Kind: "Deployment", @@ -309,9 +316,10 @@ func TestValidateAndExtractContainers(t *testing.T) { { name: errortypes.ContainersMatchedErrorMessage, args: args{ - ctx: context.TODO(), - client: fake.NewClientBuilder().Build(), - namespace: "mockNamespace", + ctx: context.TODO(), + dynamicClient: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), + coreClient: k8sfake.NewSimpleClientset().CoreV1(), + namespace: "mockNamespace", targetRef: v1alpha1.CrossVersionObjectReference{ Name: "mockName5", Kind: "Deployment", @@ -358,9 +366,10 @@ func TestValidateAndExtractContainers(t *testing.T) { { name: "validate containers err", args: args{ - ctx: context.TODO(), - client: fake.NewClientBuilder().Build(), - namespace: "mockNamespace", + ctx: context.TODO(), + dynamicClient: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()), + coreClient: k8sfake.NewSimpleClientset().CoreV1(), + namespace: "mockNamespace", targetRef: v1alpha1.CrossVersionObjectReference{ Name: "mockName5", Kind: "Deployment", @@ -420,9 +429,14 @@ func TestValidateAndExtractContainers(t *testing.T) { matchLabels := map[string]interface{}{ tt.env.matchLabelKey: tt.env.matchLabelValue, } - resourceutils.CreateMockUnstructured(matchLabels, tt.env.unstructuredTemplateSpec, tt.env.unstructuredName, tt.env.namespace, tt.env.apiVersion, tt.env.kind, tt.args.client) - resourceutils.CreateMockPod(tt.env.podLabels, tt.env.podAnnotations, tt.env.podName, tt.env.namespace, tt.env.podNodeName, nil, tt.args.client) - got, gotErr := ValidateAndExtractContainers(tt.args.ctx, tt.args.client, tt.args.namespace, tt.args.targetRef, tt.args.containerPolicies) + if err := resourceutils.CreateMockUnstructured(matchLabels, tt.env.unstructuredTemplateSpec, tt.env.unstructuredName, tt.env.namespace, tt.env.apiVersion, tt.env.kind, tt.args.dynamicClient); err != nil { + t.Errorf("CreateMockUnstructured() gotErr = %v", err) + } + if err := resourceutils.CreateMockPod(tt.env.podLabels, tt.env.podAnnotations, tt.env.podName, tt.env.namespace, tt.env.podNodeName, nil, tt.args.coreClient); err != nil { + t.Errorf("CreateMockPod() gotErr = %v", err) + } + + got, gotErr := ValidateAndExtractContainers(tt.args.ctx, tt.args.dynamicClient, tt.args.namespace, tt.args.targetRef, tt.args.containerPolicies, resourceutils.CreateMockRESTMapper()) SortContainersByContainerName(got) if !reflect.DeepEqual(got, tt.want) { t.Errorf("ValidateAndExtractContainers() got = %v, want %v", got, tt.want)