Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(orm): support outofband-resource-manager #406

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions cmd/katalyst-agent/app/agent/orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package agent

import (
"fmt"

"k8s.io/klog/v2"
plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"

"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband"
"github.com/kubewharf/katalyst-core/pkg/config"
)

const (
ORMAgent = "katalyst-agent-orm"
)

func InitORM(agentCtx *GenericContext, conf *config.Configuration, _ interface{}, _ string) (bool, Component, error) {
m, err := outofband.NewManager(conf.PluginRegistrationDir+"/kubelet.sock", agentCtx.EmitterPool.GetDefaultMetricsEmitter(), agentCtx.MetaServer, conf)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this sock be conflicted with already-existing ones? such as, qrm, agent, sysadvisor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, "/var/lib/katalyst/plugin-socks/kubelet.sock" is only used by ORM now.
cpuPlugin: qrm_cpu_plugin_dynamic.sock
memoryPlugin: qrm_memory_plugin_dynamic.sock
headroom reporter: headroom-reporter-plugin.sock
kubelet QRM: /var/lib/kubelet

if err != nil {
return false, ComponentStub{}, fmt.Errorf("failed to init ORM: %v", err)
}

if klog.V(5).Enabled() {
klog.Infof("InitORM GetHandlerType: %v", m.GetHandlerType())
}
agentCtx.PluginManager.AddHandler(m.GetHandlerType(), plugincache.PluginHandler(m))
return true, m, nil
}
2 changes: 2 additions & 0 deletions cmd/katalyst-agent/app/enableagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func init() {
agentInitializers.Store(agent.QoSSysAdvisor, AgentStarter{Init: agent.InitSysAdvisor})
agentInitializers.Store(phconsts.PeriodicalHandlerManagerName, AgentStarter{Init: periodicalhandler.NewPeriodicalHandlerManager})

agentInitializers.Store(agent.ORMAgent, AgentStarter{Init: agent.InitORM})

// qrm plugins are registered at top level of agent
agentInitializers.Store(qrm.QRMPluginNameCPU, AgentStarter{Init: qrm.InitQRMCPUPlugins})
agentInitializers.Store(qrm.QRMPluginNameMemory, AgentStarter{Init: qrm.InitQRMMemoryPlugins})
Expand Down
6 changes: 6 additions & 0 deletions cmd/katalyst-agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/eviction"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/global"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/orm"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/qrm"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/reporter"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor"
Expand Down Expand Up @@ -54,6 +55,8 @@ type Options struct {

genericQRMPluginOptions *qrm.GenericQRMPluginOptions
qrmPluginsOptions *qrm.QRMPluginsOptions

ormOptions *orm.GenericORMPluginOptions
}

// NewOptions creates a new Options with a default config.
Expand All @@ -74,6 +77,7 @@ func NewOptions() *Options {
sysadvisorPluginsOptions: sysadvisor.NewSysAdvisorPluginsOptions(),
genericQRMPluginOptions: qrm.NewGenericQRMPluginOptions(),
qrmPluginsOptions: qrm.NewQRMPluginsOptions(),
ormOptions: orm.NewGenericORMPluginOptions(),
}
}

Expand All @@ -93,6 +97,7 @@ func (o *Options) AddFlags(fss *cliflag.NamedFlagSets) {
o.sysadvisorPluginsOptions.AddFlags(fss)
o.genericQRMPluginOptions.AddFlags(fss)
o.qrmPluginsOptions.AddFlags(fss)
o.ormOptions.AddFlags(fss)
}

// ApplyTo fills up config with options
Expand All @@ -113,6 +118,7 @@ func (o *Options) ApplyTo(c *config.Configuration) error {
errList = append(errList, o.sysadvisorPluginsOptions.ApplyTo(c.SysAdvisorPluginsConfiguration))
errList = append(errList, o.genericQRMPluginOptions.ApplyTo(c.GenericQRMPluginConfiguration))
errList = append(errList, o.qrmPluginsOptions.ApplyTo(c.QRMPluginsConfiguration))
errList = append(errList, o.ormOptions.ApplyTo(c.GenericORMConfiguration))

return errors.NewAggregate(errList)
}
Expand Down
61 changes: 61 additions & 0 deletions cmd/katalyst-agent/app/options/orm/orm_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orm

import (
"time"

cliflag "k8s.io/component-base/cli/flag"

ormconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/orm"
)

type GenericORMPluginOptions struct {
ORMRconcilePeriod time.Duration
ORMResourceNamesMap map[string]string
ORMPodNotifyChanLen int
}

func NewGenericORMPluginOptions() *GenericORMPluginOptions {
return &GenericORMPluginOptions{
ORMRconcilePeriod: time.Second * 5,
ORMResourceNamesMap: map[string]string{},
ORMPodNotifyChanLen: 10,
}
}

func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("orm")

fs.DurationVar(&o.ORMRconcilePeriod, "orm-reconcile-period",
o.ORMRconcilePeriod, "orm resource reconcile period")
fs.Var(cliflag.NewMapStringString(&o.ORMResourceNamesMap), "orm-resource-names-map",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use StringToStringVar directly?

"A set of ResourceName=ResourceQuantity pairs that map resource name during QoS Resource Manager allocation period. "+
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more accurate to use Out-of-band Resource Manager instead of QoS Resource Manager in the message?

"e.g. 'resource.katalyst.kubewharf.io/reclaimed_millicpu=cpu,resource.katalyst.kubewharf.io/reclaimed_memory=memory' "+
"should be set for that reclaimed_cores pods with resources [resource.katalyst.kubewharf.io/reclaimed_millicpu] and [resource.katalyst.kubewharf.io/reclaimed_memory]"+
"will also be allocated by [cpu] and [memory] QRM plugins")
fs.IntVar(&o.ORMPodNotifyChanLen, "orm-pod-notify-chan-len",
o.ORMPodNotifyChanLen, "length of pod addition and movement notifying channel")
}

func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguration) error {
conf.ORMRconcilePeriod = o.ORMRconcilePeriod
conf.ORMResourceNamesMap = o.ORMResourceNamesMap
conf.ORMPodNotifyChanLen = o.ORMPodNotifyChanLen

return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/kubewharf/katalyst-api v0.3.4-0.20231204022248-bfefbbd96f52
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.2.0
Expand Down Expand Up @@ -101,7 +102,6 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect
github.com/opencontainers/selinux v1.10.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand Down
73 changes: 73 additions & 0 deletions pkg/agent/resourcemanager/outofband/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package outofband

import (
"fmt"
"path/filepath"

"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"

"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/checkpoint"
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/endpoint"
"github.com/kubewharf/katalyst-core/pkg/consts"
)

func (m *ManagerImpl) checkpointFile() string {
return filepath.Join(m.socketdir, consts.KubeletQoSResourceManagerCheckpoint)
}

func (m *ManagerImpl) writeCheckpoint() error {
data := checkpoint.New(m.podResources.toCheckpointData())
err := m.checkpointManager.CreateCheckpoint(consts.KubeletQoSResourceManagerCheckpoint, data)
if err != nil {
err = fmt.Errorf("[ORM] failed to write checkpoint file %q: %v", consts.KubeletQoSResourceManagerCheckpoint, err)
klog.Warning(err)
return err
}
return nil
}

func (m *ManagerImpl) readCheckpoint() error {
resEntries := make([]checkpoint.PodResourcesEntry, 0)
cp := checkpoint.New(resEntries)
err := m.checkpointManager.GetCheckpoint(consts.KubeletQoSResourceManagerCheckpoint, cp)
if err != nil {
if err == errors.ErrCheckpointNotFound {
klog.Warningf("[ORM] Failed to retrieve checkpoint for %q: %v", consts.KubeletQoSResourceManagerCheckpoint, err)
return nil
}
return err
}

podResources := cp.GetData()
klog.V(5).Infof("[ORM] read checkpoint %v", podResources)
m.podResources.fromCheckpointData(podResources)

m.mutex.Lock()

allocatedResourceNames := m.podResources.allAllocatedResourceNames()

for _, allocatedResourceName := range allocatedResourceNames.UnsortedList() {
m.endpoints[allocatedResourceName] = endpoint.EndpointInfo{E: endpoint.NewStoppedEndpointImpl(allocatedResourceName), Opts: nil}
}

m.mutex.Unlock()

return nil
}
81 changes: 81 additions & 0 deletions pkg/agent/resourcemanager/outofband/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package checkpoint

import (
"encoding/json"

"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)

// ResourceManagerCheckpoint defines the operations to retrieve pod resources
type ResourceManagerCheckpoint interface {
checkpointmanager.Checkpoint
GetData() []PodResourcesEntry
}

// PodResourcesEntry connects pod information to resources
type PodResourcesEntry struct {
PodUID string
ContainerName string
ResourceName string
AllocationInfo string
}

// checkpointData struct is used to store pod to resource allocation information
// in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
PodResourceEntries []PodResourcesEntry
}

// Data holds checkpoint data and its checksum
type Data struct {
Data checkpointData
Checksum checksum.Checksum
}

// New returns an instance of Checkpoint
func New(resEntries []PodResourcesEntry) ResourceManagerCheckpoint {
return &Data{
Data: checkpointData{
PodResourceEntries: resEntries,
},
}
}

// MarshalCheckpoint returns marshaled data
func (cp *Data) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(cp.Data)
return json.Marshal(*cp)
}

// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}

// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}

// GetData returns resource entries and registered resources
func (cp *Data) GetData() []PodResourcesEntry {
return cp.Data.PodResourceEntries
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package checkpoint

import (
"testing"

"github.com/stretchr/testify/assert"
)

var testCheckpointData = `{"Data":{"PodResourceEntries":[{"PodUID":"447b997f-26a2-4e46-8370-d7af4cc78ad2","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"447b997f-26a2-4e46-8370-d7af4cc78ad2","ContainerName":"liveness-probe","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u001c@*\u00031-7"},{"PodUID":"2df9cabd-5212-4436-a089-89fcab46bfc7","ContainerName":"server","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"2df9cabd-5212-4436-a089-89fcab46bfc7","ContainerName":"server","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"46d9160c-096e-4d63-8b94-766acc9062fc","ContainerName":"kube-flannel","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"0eeeaa35-b954-4498-bf63-6cf6cfbdc620","ContainerName":"server","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"0eeeaa35-b954-4498-bf63-6cf6cfbdc620","ContainerName":"server","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"95df62b2-87a7-49f6-91b7-c801f9630c93","ContainerName":"inspector","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"6d37ac5f-ff2c-4afa-8df7-18cd06ec53a7","ContainerName":"coredns","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"6d37ac5f-ff2c-4afa-8df7-18cd06ec53a7","ContainerName":"coredns","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-provisioner","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-provisioner","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-attacher","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-attacher","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"liveness-probe","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-nas-driver","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-nas-driver","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"0893ea73-8650-4067-9097-95faf85b2a16","ContainerName":"testcontainer","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"0893ea73-8650-4067-9097-95faf85b2a16","ContainerName":"testcontainer","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"743fb5e0-bba1-47fb-81d6-8e38136b5453","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"}]},"Checksum":3996144592}`

func TestData_UnmarshalCheckpoint(t *testing.T) {
t.Parallel()

entries := make([]PodResourcesEntry, 0)

data := New(entries)
err := data.UnmarshalCheckpoint([]byte(testCheckpointData))
assert.NoError(t, err)

bytes, err := data.MarshalCheckpoint()
assert.NoError(t, err)
assert.Equal(t, []byte(testCheckpointData), bytes)

err = data.VerifyChecksum()
assert.NoError(t, err)

entries = data.GetData()
assert.Equal(t, 21, len(entries))
}
Loading