-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(orm): support outofband-resource-manager #406
Merged
waynepeking348
merged 6 commits into
kubewharf:main
from
WangZzzhe:dev/outofband-resource-manager
Jan 4, 2024
Merged
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
ea8504b
feat(ORM):add outofband resource manager
WangZzzhe 06c70ea
bugfix: generate podlevel cgroup path with pod prefix
WangZzzhe 3763554
trigger
WangZzzhe ca29648
add comment for ORMResourceNamesMap flag
WangZzzhe 0bfafcb
check qoslevel by qos config in orm
WangZzzhe 2e93922
init qosconfig in ut
WangZzzhe File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
Copyright 2022 The Katalyst Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package agent | ||
|
||
import ( | ||
"fmt" | ||
|
||
"k8s.io/klog/v2" | ||
plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" | ||
|
||
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband" | ||
"github.com/kubewharf/katalyst-core/pkg/config" | ||
) | ||
|
||
const ( | ||
ORMAgent = "katalyst-agent-orm" | ||
) | ||
|
||
func InitORM(agentCtx *GenericContext, conf *config.Configuration, _ interface{}, _ string) (bool, Component, error) { | ||
m, err := outofband.NewManager(conf.PluginRegistrationDir+"/kubelet.sock", agentCtx.EmitterPool.GetDefaultMetricsEmitter(), agentCtx.MetaServer, conf) | ||
if err != nil { | ||
return false, ComponentStub{}, fmt.Errorf("failed to init ORM: %v", err) | ||
} | ||
|
||
if klog.V(5).Enabled() { | ||
klog.Infof("InitORM GetHandlerType: %v", m.GetHandlerType()) | ||
} | ||
agentCtx.PluginManager.AddHandler(m.GetHandlerType(), plugincache.PluginHandler(m)) | ||
return true, m, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
Copyright 2022 The Katalyst Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package orm | ||
|
||
import ( | ||
"time" | ||
|
||
cliflag "k8s.io/component-base/cli/flag" | ||
|
||
ormconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/orm" | ||
) | ||
|
||
type GenericORMPluginOptions struct { | ||
ORMRconcilePeriod time.Duration | ||
ORMResourceNamesMap map[string]string | ||
ORMPodNotifyChanLen int | ||
} | ||
|
||
func NewGenericORMPluginOptions() *GenericORMPluginOptions { | ||
return &GenericORMPluginOptions{ | ||
ORMRconcilePeriod: time.Second * 5, | ||
ORMResourceNamesMap: map[string]string{}, | ||
ORMPodNotifyChanLen: 10, | ||
} | ||
} | ||
|
||
func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { | ||
fs := fss.FlagSet("orm") | ||
|
||
fs.DurationVar(&o.ORMRconcilePeriod, "orm-reconcile-period", | ||
o.ORMRconcilePeriod, "orm resource reconcile period") | ||
fs.Var(cliflag.NewMapStringString(&o.ORMResourceNamesMap), "orm-resource-names-map", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not use StringToStringVar directly? |
||
"A set of ResourceName=ResourceQuantity pairs that map resource name during QoS Resource Manager allocation period. "+ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be more accurate to use Out-of-band Resource Manager instead of QoS Resource Manager in the message? |
||
"e.g. 'resource.katalyst.kubewharf.io/reclaimed_millicpu=cpu,resource.katalyst.kubewharf.io/reclaimed_memory=memory' "+ | ||
"should be set for that reclaimed_cores pods with resources [resource.katalyst.kubewharf.io/reclaimed_millicpu] and [resource.katalyst.kubewharf.io/reclaimed_memory]"+ | ||
"will also be allocated by [cpu] and [memory] QRM plugins") | ||
fs.IntVar(&o.ORMPodNotifyChanLen, "orm-pod-notify-chan-len", | ||
o.ORMPodNotifyChanLen, "length of pod addition and movement notifying channel") | ||
} | ||
|
||
func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguration) error { | ||
conf.ORMRconcilePeriod = o.ORMRconcilePeriod | ||
conf.ORMResourceNamesMap = o.ORMResourceNamesMap | ||
conf.ORMPodNotifyChanLen = o.ORMPodNotifyChanLen | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
Copyright 2022 The Katalyst Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package outofband | ||
|
||
import ( | ||
"fmt" | ||
"path/filepath" | ||
|
||
"k8s.io/klog/v2" | ||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" | ||
|
||
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/checkpoint" | ||
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/endpoint" | ||
"github.com/kubewharf/katalyst-core/pkg/consts" | ||
) | ||
|
||
func (m *ManagerImpl) checkpointFile() string { | ||
return filepath.Join(m.socketdir, consts.KubeletQoSResourceManagerCheckpoint) | ||
} | ||
|
||
func (m *ManagerImpl) writeCheckpoint() error { | ||
data := checkpoint.New(m.podResources.toCheckpointData()) | ||
err := m.checkpointManager.CreateCheckpoint(consts.KubeletQoSResourceManagerCheckpoint, data) | ||
if err != nil { | ||
err = fmt.Errorf("[ORM] failed to write checkpoint file %q: %v", consts.KubeletQoSResourceManagerCheckpoint, err) | ||
klog.Warning(err) | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func (m *ManagerImpl) readCheckpoint() error { | ||
resEntries := make([]checkpoint.PodResourcesEntry, 0) | ||
cp := checkpoint.New(resEntries) | ||
err := m.checkpointManager.GetCheckpoint(consts.KubeletQoSResourceManagerCheckpoint, cp) | ||
if err != nil { | ||
if err == errors.ErrCheckpointNotFound { | ||
klog.Warningf("[ORM] Failed to retrieve checkpoint for %q: %v", consts.KubeletQoSResourceManagerCheckpoint, err) | ||
return nil | ||
} | ||
return err | ||
} | ||
|
||
podResources := cp.GetData() | ||
klog.V(5).Infof("[ORM] read checkpoint %v", podResources) | ||
m.podResources.fromCheckpointData(podResources) | ||
|
||
m.mutex.Lock() | ||
|
||
allocatedResourceNames := m.podResources.allAllocatedResourceNames() | ||
|
||
for _, allocatedResourceName := range allocatedResourceNames.UnsortedList() { | ||
m.endpoints[allocatedResourceName] = endpoint.EndpointInfo{E: endpoint.NewStoppedEndpointImpl(allocatedResourceName), Opts: nil} | ||
} | ||
|
||
m.mutex.Unlock() | ||
|
||
return nil | ||
} |
81 changes: 81 additions & 0 deletions
81
pkg/agent/resourcemanager/outofband/checkpoint/checkpoint.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
Copyright 2022 The Katalyst Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package checkpoint | ||
|
||
import ( | ||
"encoding/json" | ||
|
||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" | ||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" | ||
) | ||
|
||
// ResourceManagerCheckpoint defines the operations to retrieve pod resources | ||
type ResourceManagerCheckpoint interface { | ||
checkpointmanager.Checkpoint | ||
GetData() []PodResourcesEntry | ||
} | ||
|
||
// PodResourcesEntry connects pod information to resources | ||
type PodResourcesEntry struct { | ||
PodUID string | ||
ContainerName string | ||
ResourceName string | ||
AllocationInfo string | ||
} | ||
|
||
// checkpointData struct is used to store pod to resource allocation information | ||
// in a checkpoint file. | ||
// TODO: add version control when we need to change checkpoint format. | ||
type checkpointData struct { | ||
PodResourceEntries []PodResourcesEntry | ||
} | ||
|
||
// Data holds checkpoint data and its checksum | ||
type Data struct { | ||
Data checkpointData | ||
Checksum checksum.Checksum | ||
} | ||
|
||
// New returns an instance of Checkpoint | ||
func New(resEntries []PodResourcesEntry) ResourceManagerCheckpoint { | ||
return &Data{ | ||
Data: checkpointData{ | ||
PodResourceEntries: resEntries, | ||
}, | ||
} | ||
} | ||
|
||
// MarshalCheckpoint returns marshaled data | ||
func (cp *Data) MarshalCheckpoint() ([]byte, error) { | ||
cp.Checksum = checksum.New(cp.Data) | ||
return json.Marshal(*cp) | ||
} | ||
|
||
// UnmarshalCheckpoint returns unmarshalled data | ||
func (cp *Data) UnmarshalCheckpoint(blob []byte) error { | ||
return json.Unmarshal(blob, cp) | ||
} | ||
|
||
// VerifyChecksum verifies that passed checksum is same as calculated checksum | ||
func (cp *Data) VerifyChecksum() error { | ||
return cp.Checksum.Verify(cp.Data) | ||
} | ||
|
||
// GetData returns resource entries and registered resources | ||
func (cp *Data) GetData() []PodResourcesEntry { | ||
return cp.Data.PodResourceEntries | ||
} |
45 changes: 45 additions & 0 deletions
45
pkg/agent/resourcemanager/outofband/checkpoint/checkpoint_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
Copyright 2022 The Katalyst Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package checkpoint | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
var testCheckpointData = `{"Data":{"PodResourceEntries":[{"PodUID":"447b997f-26a2-4e46-8370-d7af4cc78ad2","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"447b997f-26a2-4e46-8370-d7af4cc78ad2","ContainerName":"liveness-probe","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u001c@*\u00031-7"},{"PodUID":"2df9cabd-5212-4436-a089-89fcab46bfc7","ContainerName":"server","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"2df9cabd-5212-4436-a089-89fcab46bfc7","ContainerName":"server","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"46d9160c-096e-4d63-8b94-766acc9062fc","ContainerName":"kube-flannel","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"0eeeaa35-b954-4498-bf63-6cf6cfbdc620","ContainerName":"server","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"0eeeaa35-b954-4498-bf63-6cf6cfbdc620","ContainerName":"server","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"95df62b2-87a7-49f6-91b7-c801f9630c93","ContainerName":"inspector","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"6d37ac5f-ff2c-4afa-8df7-18cd06ec53a7","ContainerName":"coredns","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"6d37ac5f-ff2c-4afa-8df7-18cd06ec53a7","ContainerName":"coredns","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-provisioner","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-provisioner","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-attacher","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-attacher","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"liveness-probe","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-nas-driver","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-nas-driver","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"0893ea73-8650-4067-9097-95faf85b2a16","ContainerName":"testcontainer","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"0893ea73-8650-4067-9097-95faf85b2a16","ContainerName":"testcontainer","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"743fb5e0-bba1-47fb-81d6-8e38136b5453","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"}]},"Checksum":3996144592}` | ||
|
||
func TestData_UnmarshalCheckpoint(t *testing.T) { | ||
t.Parallel() | ||
|
||
entries := make([]PodResourcesEntry, 0) | ||
|
||
data := New(entries) | ||
err := data.UnmarshalCheckpoint([]byte(testCheckpointData)) | ||
assert.NoError(t, err) | ||
|
||
bytes, err := data.MarshalCheckpoint() | ||
assert.NoError(t, err) | ||
assert.Equal(t, []byte(testCheckpointData), bytes) | ||
|
||
err = data.VerifyChecksum() | ||
assert.NoError(t, err) | ||
|
||
entries = data.GetData() | ||
assert.Equal(t, 21, len(entries)) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this sock be conflicted with already-existing ones? such as, qrm, agent, sysadvisor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, "/var/lib/katalyst/plugin-socks/kubelet.sock" is only used by ORM now.
cpuPlugin: qrm_cpu_plugin_dynamic.sock
memoryPlugin: qrm_memory_plugin_dynamic.sock
headroom reporter: headroom-reporter-plugin.sock
kubelet QRM: /var/lib/kubelet