Skip to content

Commit

Permalink
feat(overcommit): add cron worker for overcommit controller
Browse files Browse the repository at this point in the history
  • Loading branch information
WangZzzhe committed Jul 31, 2024
1 parent 7d9189e commit 5e1420d
Show file tree
Hide file tree
Showing 6 changed files with 781 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/WangZzzhe/katalyst-api v0.0.0-20240731074329-d5d7befcfd65
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/WangZzzhe/katalyst-api v0.0.0-20240731074329-d5d7befcfd65 h1:hxlVFJqOan+4VxwTunY7LdjjBa8g9rUR6aqUF+45P3c=
github.com/WangZzzhe/katalyst-api v0.0.0-20240731074329-d5d7befcfd65/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -568,8 +570,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d h1:6CuK3axf2B63zIkEu5XyxbaC+JArE/3Jo3QHvb+Hn0M=
github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
32 changes: 32 additions & 0 deletions pkg/client/control/noc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"

jsonpatch "github.com/evanphx/json-patch"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
Expand All @@ -31,6 +32,7 @@ import (
)

type NocUpdater interface {
PatchNoc(ctx context.Context, oldNoc, newNoc *v1alpha1.NodeOvercommitConfig) (*v1alpha1.NodeOvercommitConfig, error)
PatchNocStatus(ctx context.Context, oldNoc, newNoc *v1alpha1.NodeOvercommitConfig) (*v1alpha1.NodeOvercommitConfig, error)
}

Expand All @@ -40,6 +42,10 @@ func (d *DummyNocUpdater) PatchNocStatus(_ context.Context, _, newNoc *v1alpha1.
return newNoc, nil
}

func (d *DummyNocUpdater) PatchNoc(_ context.Context, _, newNoc *v1alpha1.NodeOvercommitConfig) (*v1alpha1.NodeOvercommitConfig, error) {
return newNoc, nil
}

func NewRealNocUpdater(client clientset.Interface) NocUpdater {
return &RealNocUpdater{
client: client,
Expand Down Expand Up @@ -75,3 +81,29 @@ func (r *RealNocUpdater) PatchNocStatus(ctx context.Context, oldNoc, newNoc *v1a

return r.client.OvercommitV1alpha1().NodeOvercommitConfigs().Patch(ctx, oldNoc.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
}

func (r *RealNocUpdater) PatchNoc(ctx context.Context, oldNoc, newNoc *v1alpha1.NodeOvercommitConfig) (*v1alpha1.NodeOvercommitConfig, error) {
if oldNoc == nil || newNoc == nil {
return nil, fmt.Errorf("can't patch a nil noc")
}

oldData, err := json.Marshal(oldNoc)
if err != nil {
return nil, err
}

newData, err := json.Marshal(newNoc)
if err != nil {
return nil, err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, err
}
if general.JsonPathEmpty(patchBytes) {
return oldNoc, nil
}

return r.client.OvercommitV1alpha1().NodeOvercommitConfigs().Patch(ctx, oldNoc.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
}
219 changes: 219 additions & 0 deletions pkg/controller/overcommit/node/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package node

import (
"context"
"fmt"
"strconv"
"time"

"github.com/robfig/cron/v3"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

"github.com/kubewharf/katalyst-api/pkg/apis/overcommit/v1alpha1"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

const (
cronOvercommitFail = "cronOvercommitFail"
cronOvercommitSuccess = "cronOvercommitSuccess"
)

func (nc *NodeOvercommitController) cronWorker() {
nocList, err := nc.nodeOvercommitLister.List(labels.Everything())
if err != nil {
klog.Errorf("list noc fail: %v", err)
_ = nc.metricsEmitter.StoreInt64(cronOvercommitFail, 1, metrics.MetricTypeNameCount)
return
}

for _, noc := range nocList {
err = nc.syncTimeBounds(noc)
if err != nil {
klog.Errorf("noc %v syncTimeBounds fail: %v", noc.Name, err)
_ = nc.metricsEmitter.StoreInt64(cronOvercommitFail, 1, metrics.MetricTypeNameCount, metrics.MetricTag{Key: "configName", Val: noc.Name})
continue
}
}

_ = nc.metricsEmitter.StoreInt64(cronOvercommitSuccess, 1, metrics.MetricTypeNameCount)
}

func (nc *NodeOvercommitController) syncTimeBounds(noc *v1alpha1.NodeOvercommitConfig) error {
if len(noc.Spec.TimeBounds) == 0 {
return nil
}

var (
now = time.Now()
resourceOvercommitRatio = make(map[v1.ResourceName]string)
resourceLastMissedRun = map[v1.ResourceName]time.Time{}
)

for _, tb := range noc.Spec.TimeBounds {
if !timeBoundValid(now, tb) {
continue
}

for _, bound := range tb.Bounds {
missedRun, _, err := getNextSchedule(bound, now, lastScheduleTime(noc), noc.Spec.StartingDeadlineSeconds)
if err != nil {
klog.Error(err)
continue
}
if missedRun.IsZero() {
continue
}

for resource, overcommitRatioStr := range bound.ResourceOvercommitRatio {
if resource != v1.ResourceCPU && resource != v1.ResourceMemory {
klog.Warningf("resource %v not support", resource)
continue
}

klog.V(6).Infof("noc %v missedRun: %v, lastMissedRun: %v, now: %v", noc.Name, missedRun, resourceLastMissedRun[resource], now)
lastMissedRun, ok := resourceLastMissedRun[resource]
if !ok || missedRun.After(lastMissedRun) {
if !missRunAllowed(noc, now, missedRun) {
continue
}

resourceLastMissedRun[resource] = missedRun

overcommitRatio, err := strconv.ParseFloat(overcommitRatioStr, 64)
if err != nil {
klog.Errorf("parse overcommit ratio %v fail: %v", overcommitRatioStr, err)
continue
}
if overcommitRatio < 1.0 {
klog.Errorf("overcommit ratio %v is less than 1, skip", overcommitRatioStr)
continue
}

resourceOvercommitRatio[resource] = overcommitRatioStr
}
}
}
}

// patch noc
if len(resourceOvercommitRatio) > 0 {
err := nc.patchNocResourceOvercommitRatio(noc, resourceOvercommitRatio)
if err != nil {
return err
}
err = nc.patchNocLastScheduleTime(noc)
return err
}
return nil
}

func (nc *NodeOvercommitController) patchNocResourceOvercommitRatio(noc *v1alpha1.NodeOvercommitConfig, resourceOvercommitRatio map[v1.ResourceName]string) error {
nocCopy := noc.DeepCopy()
if nocCopy.Spec.ResourceOvercommitRatio == nil {
nocCopy.Spec.ResourceOvercommitRatio = map[v1.ResourceName]string{}
}
for resource, value := range resourceOvercommitRatio {
nocCopy.Spec.ResourceOvercommitRatio[resource] = value
}

timeout, cancel := context.WithTimeout(nc.ctx, time.Second)
defer cancel()
_, err := nc.nocUpdater.PatchNoc(timeout, noc, nocCopy)
if err != nil {
klog.Errorf("noc %v patchNocResourceOvercommitRatio fail: %v", noc.Name, err)
return err
}
return nil
}

func (nc *NodeOvercommitController) patchNocLastScheduleTime(noc *v1alpha1.NodeOvercommitConfig) error {
nocCopy := noc.DeepCopy()
nocCopy.Status.LastScheduleTime = &metav1.Time{Time: time.Now()}

timeout, cancel := context.WithTimeout(nc.ctx, time.Second)
defer cancel()
_, err := nc.nocUpdater.PatchNocStatus(timeout, noc, nocCopy)
if err != nil {
klog.Errorf("noc %v patchNocLastScheduleTime fail: %v", noc.Name, err)
return err
}
return nil
}

func missRunAllowed(noc *v1alpha1.NodeOvercommitConfig, now, lastRunTime time.Time) bool {
if noc.Spec.CronConsistency {
return true
}

if now.Sub(lastRunTime) < time.Minute {
return true
}
return false
}

func timeBoundValid(now time.Time, bound v1alpha1.TimeBound) bool {
if bound.Start.IsZero() && bound.End.IsZero() {
return true
}

if bound.Start.IsZero() {
return now.Before(bound.End.Time)
}
if bound.End.IsZero() {
return now.After(bound.Start.Time)
}
klog.V(6).Infof("timeBoundValid, now: %v, start: %v, end: %v", now, bound.Start.Time, bound.End.Time)
return now.After(bound.Start.Time) && now.Before(bound.End.Time)
}

func getNextSchedule(bound v1alpha1.Bound, now time.Time, earliestTime time.Time, schedulingDeadlineSeconds *int64) (lastMissed time.Time, next time.Time, err error) {
sched, err := cron.ParseStandard(bound.CronTab)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("ParseStandard fail, cronTab: %v, err: %v", bound.CronTab, err)
}
if schedulingDeadlineSeconds != nil {
schedulingDeadline := now.Add(-time.Second * time.Duration(*schedulingDeadlineSeconds))
if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
if earliestTime.After(now) {
return time.Time{}, sched.Next(now), nil
}

starts := 0
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
lastMissed = t
starts++
if starts > 100 {
return time.Time{}, time.Time{}, fmt.Errorf("too many missed jobs (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew")
}
}
return lastMissed, sched.Next(now), nil
}

func lastScheduleTime(noc *v1alpha1.NodeOvercommitConfig) time.Time {
if noc.Status.LastScheduleTime != nil {
return noc.Status.LastScheduleTime.Time
}
return noc.CreationTimestamp.Time
}
Loading

0 comments on commit 5e1420d

Please sign in to comment.