diff --git a/cmd/katalyst-agent/app/agent/orm.go b/cmd/katalyst-agent/app/agent/orm.go new file mode 100644 index 000000000..42fb19058 --- /dev/null +++ b/cmd/katalyst-agent/app/agent/orm.go @@ -0,0 +1,44 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agent + +import ( + "fmt" + + "k8s.io/klog/v2" + plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" + + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband" + "github.com/kubewharf/katalyst-core/pkg/config" +) + +const ( + ORMAgent = "katalyst-agent-orm" +) + +func InitORM(agentCtx *GenericContext, conf *config.Configuration, _ interface{}, _ string) (bool, Component, error) { + m, err := outofband.NewManager(conf.PluginRegistrationDir+"/kubelet.sock", agentCtx.EmitterPool.GetDefaultMetricsEmitter(), agentCtx.MetaServer, conf) + if err != nil { + return false, ComponentStub{}, fmt.Errorf("failed to init ORM: %v", err) + } + + if klog.V(5).Enabled() { + klog.Infof("InitORM GetHandlerType: %v", m.GetHandlerType()) + } + agentCtx.PluginManager.AddHandler(m.GetHandlerType(), plugincache.PluginHandler(m)) + return true, m, nil +} diff --git a/cmd/katalyst-agent/app/enableagents.go b/cmd/katalyst-agent/app/enableagents.go index c1cdcf78e..357e152be 100644 --- a/cmd/katalyst-agent/app/enableagents.go +++ b/cmd/katalyst-agent/app/enableagents.go @@ -49,6 +49,8 @@ func init() { agentInitializers.Store(agent.QoSSysAdvisor, AgentStarter{Init: agent.InitSysAdvisor}) agentInitializers.Store(phconsts.PeriodicalHandlerManagerName, AgentStarter{Init: periodicalhandler.NewPeriodicalHandlerManager}) + agentInitializers.Store(agent.ORMAgent, AgentStarter{Init: agent.InitORM}) + // qrm plugins are registered at top level of agent agentInitializers.Store(qrm.QRMPluginNameCPU, AgentStarter{Init: qrm.InitQRMCPUPlugins}) agentInitializers.Store(qrm.QRMPluginNameMemory, AgentStarter{Init: qrm.InitQRMMemoryPlugins}) diff --git a/cmd/katalyst-agent/app/options/options.go b/cmd/katalyst-agent/app/options/options.go index 9daacec9b..0e8b29e93 100644 --- a/cmd/katalyst-agent/app/options/options.go +++ b/cmd/katalyst-agent/app/options/options.go @@ -24,6 +24,7 @@ import ( "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/eviction" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/global" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/orm" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/qrm" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/reporter" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor" @@ -54,6 +55,8 @@ type Options struct { genericQRMPluginOptions *qrm.GenericQRMPluginOptions qrmPluginsOptions *qrm.QRMPluginsOptions + + ormOptions *orm.GenericORMPluginOptions } // NewOptions creates a new Options with a default config. @@ -74,6 +77,7 @@ func NewOptions() *Options { sysadvisorPluginsOptions: sysadvisor.NewSysAdvisorPluginsOptions(), genericQRMPluginOptions: qrm.NewGenericQRMPluginOptions(), qrmPluginsOptions: qrm.NewQRMPluginsOptions(), + ormOptions: orm.NewGenericORMPluginOptions(), } } @@ -93,6 +97,7 @@ func (o *Options) AddFlags(fss *cliflag.NamedFlagSets) { o.sysadvisorPluginsOptions.AddFlags(fss) o.genericQRMPluginOptions.AddFlags(fss) o.qrmPluginsOptions.AddFlags(fss) + o.ormOptions.AddFlags(fss) } // ApplyTo fills up config with options @@ -113,6 +118,7 @@ func (o *Options) ApplyTo(c *config.Configuration) error { errList = append(errList, o.sysadvisorPluginsOptions.ApplyTo(c.SysAdvisorPluginsConfiguration)) errList = append(errList, o.genericQRMPluginOptions.ApplyTo(c.GenericQRMPluginConfiguration)) errList = append(errList, o.qrmPluginsOptions.ApplyTo(c.QRMPluginsConfiguration)) + errList = append(errList, o.ormOptions.ApplyTo(c.GenericORMConfiguration)) return errors.NewAggregate(errList) } diff --git a/cmd/katalyst-agent/app/options/orm/orm_base.go b/cmd/katalyst-agent/app/options/orm/orm_base.go new file mode 100644 index 000000000..48eece781 --- /dev/null +++ b/cmd/katalyst-agent/app/options/orm/orm_base.go @@ -0,0 +1,61 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orm + +import ( + "time" + + cliflag "k8s.io/component-base/cli/flag" + + ormconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/orm" +) + +type GenericORMPluginOptions struct { + ORMRconcilePeriod time.Duration + ORMResourceNamesMap map[string]string + ORMPodNotifyChanLen int +} + +func NewGenericORMPluginOptions() *GenericORMPluginOptions { + return &GenericORMPluginOptions{ + ORMRconcilePeriod: time.Second * 5, + ORMResourceNamesMap: map[string]string{}, + ORMPodNotifyChanLen: 10, + } +} + +func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { + fs := fss.FlagSet("orm") + + fs.DurationVar(&o.ORMRconcilePeriod, "orm-reconcile-period", + o.ORMRconcilePeriod, "orm resource reconcile period") + fs.StringToStringVar(&o.ORMResourceNamesMap, "orm-resource-names-map", o.ORMResourceNamesMap, + "A set of ResourceName=ResourceQuantity pairs that map resource name during out-of-band Resource Manager allocation period. "+ + "e.g. 'resource.katalyst.kubewharf.io/reclaimed_millicpu=cpu,resource.katalyst.kubewharf.io/reclaimed_memory=memory' "+ + "should be set for that reclaimed_cores pods with resources [resource.katalyst.kubewharf.io/reclaimed_millicpu] and [resource.katalyst.kubewharf.io/reclaimed_memory]"+ + "will also be allocated by [cpu] and [memory] QRM plugins") + fs.IntVar(&o.ORMPodNotifyChanLen, "orm-pod-notify-chan-len", + o.ORMPodNotifyChanLen, "length of pod addition and movement notifying channel") +} + +func (o *GenericORMPluginOptions) ApplyTo(conf *ormconfig.GenericORMConfiguration) error { + conf.ORMRconcilePeriod = o.ORMRconcilePeriod + conf.ORMResourceNamesMap = o.ORMResourceNamesMap + conf.ORMPodNotifyChanLen = o.ORMPodNotifyChanLen + + return nil +} diff --git a/go.mod b/go.mod index 08701a1f3..56a481039 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/kubewharf/katalyst-api v0.3.4-0.20231204022248-bfefbbd96f52 github.com/montanaflynn/stats v0.7.1 github.com/opencontainers/runc v1.1.6 + github.com/opencontainers/selinux v1.10.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/prometheus/client_model v0.2.0 @@ -101,7 +102,6 @@ require ( github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect - github.com/opencontainers/selinux v1.10.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/pkg/agent/resourcemanager/outofband/checkpoint.go b/pkg/agent/resourcemanager/outofband/checkpoint.go new file mode 100644 index 000000000..8e8a49e39 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/checkpoint.go @@ -0,0 +1,73 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package outofband + +import ( + "fmt" + "path/filepath" + + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/checkpoint" + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/endpoint" + "github.com/kubewharf/katalyst-core/pkg/consts" +) + +func (m *ManagerImpl) checkpointFile() string { + return filepath.Join(m.socketdir, consts.KubeletQoSResourceManagerCheckpoint) +} + +func (m *ManagerImpl) writeCheckpoint() error { + data := checkpoint.New(m.podResources.toCheckpointData()) + err := m.checkpointManager.CreateCheckpoint(consts.KubeletQoSResourceManagerCheckpoint, data) + if err != nil { + err = fmt.Errorf("[ORM] failed to write checkpoint file %q: %v", consts.KubeletQoSResourceManagerCheckpoint, err) + klog.Warning(err) + return err + } + return nil +} + +func (m *ManagerImpl) readCheckpoint() error { + resEntries := make([]checkpoint.PodResourcesEntry, 0) + cp := checkpoint.New(resEntries) + err := m.checkpointManager.GetCheckpoint(consts.KubeletQoSResourceManagerCheckpoint, cp) + if err != nil { + if err == errors.ErrCheckpointNotFound { + klog.Warningf("[ORM] Failed to retrieve checkpoint for %q: %v", consts.KubeletQoSResourceManagerCheckpoint, err) + return nil + } + return err + } + + podResources := cp.GetData() + klog.V(5).Infof("[ORM] read checkpoint %v", podResources) + m.podResources.fromCheckpointData(podResources) + + m.mutex.Lock() + + allocatedResourceNames := m.podResources.allAllocatedResourceNames() + + for _, allocatedResourceName := range allocatedResourceNames.UnsortedList() { + m.endpoints[allocatedResourceName] = endpoint.EndpointInfo{E: endpoint.NewStoppedEndpointImpl(allocatedResourceName), Opts: nil} + } + + m.mutex.Unlock() + + return nil +} diff --git a/pkg/agent/resourcemanager/outofband/checkpoint/checkpoint.go b/pkg/agent/resourcemanager/outofband/checkpoint/checkpoint.go new file mode 100644 index 000000000..927a04da5 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/checkpoint/checkpoint.go @@ -0,0 +1,81 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpoint + +import ( + "encoding/json" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" +) + +// ResourceManagerCheckpoint defines the operations to retrieve pod resources +type ResourceManagerCheckpoint interface { + checkpointmanager.Checkpoint + GetData() []PodResourcesEntry +} + +// PodResourcesEntry connects pod information to resources +type PodResourcesEntry struct { + PodUID string + ContainerName string + ResourceName string + AllocationInfo string +} + +// checkpointData struct is used to store pod to resource allocation information +// in a checkpoint file. +// TODO: add version control when we need to change checkpoint format. +type checkpointData struct { + PodResourceEntries []PodResourcesEntry +} + +// Data holds checkpoint data and its checksum +type Data struct { + Data checkpointData + Checksum checksum.Checksum +} + +// New returns an instance of Checkpoint +func New(resEntries []PodResourcesEntry) ResourceManagerCheckpoint { + return &Data{ + Data: checkpointData{ + PodResourceEntries: resEntries, + }, + } +} + +// MarshalCheckpoint returns marshaled data +func (cp *Data) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(cp.Data) + return json.Marshal(*cp) +} + +// UnmarshalCheckpoint returns unmarshalled data +func (cp *Data) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cp *Data) VerifyChecksum() error { + return cp.Checksum.Verify(cp.Data) +} + +// GetData returns resource entries and registered resources +func (cp *Data) GetData() []PodResourcesEntry { + return cp.Data.PodResourceEntries +} diff --git a/pkg/agent/resourcemanager/outofband/checkpoint/checkpoint_test.go b/pkg/agent/resourcemanager/outofband/checkpoint/checkpoint_test.go new file mode 100644 index 000000000..3d71515f6 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/checkpoint/checkpoint_test.go @@ -0,0 +1,45 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpoint + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +var testCheckpointData = `{"Data":{"PodResourceEntries":[{"PodUID":"447b997f-26a2-4e46-8370-d7af4cc78ad2","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"447b997f-26a2-4e46-8370-d7af4cc78ad2","ContainerName":"liveness-probe","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u001c@*\u00031-7"},{"PodUID":"2df9cabd-5212-4436-a089-89fcab46bfc7","ContainerName":"server","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"2df9cabd-5212-4436-a089-89fcab46bfc7","ContainerName":"server","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"46d9160c-096e-4d63-8b94-766acc9062fc","ContainerName":"kube-flannel","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"0eeeaa35-b954-4498-bf63-6cf6cfbdc620","ContainerName":"server","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"0eeeaa35-b954-4498-bf63-6cf6cfbdc620","ContainerName":"server","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"95df62b2-87a7-49f6-91b7-c801f9630c93","ContainerName":"inspector","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"6d37ac5f-ff2c-4afa-8df7-18cd06ec53a7","ContainerName":"coredns","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"6d37ac5f-ff2c-4afa-8df7-18cd06ec53a7","ContainerName":"coredns","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-provisioner","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-provisioner","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-attacher","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-attacher","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"liveness-probe","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-nas-driver","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"185fec2a-69eb-4af3-aa95-967837c5d527","ContainerName":"csi-nas-driver","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"0893ea73-8650-4067-9097-95faf85b2a16","ContainerName":"testcontainer","ResourceName":"cpu","AllocationInfo":"\n\nCpusetCpus\u0018\u0001!\u0000\u0000\u0000\u0000\u0000\u0000\u0010@*\u00034-7"},{"PodUID":"0893ea73-8650-4067-9097-95faf85b2a16","ContainerName":"testcontainer","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"},{"PodUID":"743fb5e0-bba1-47fb-81d6-8e38136b5453","ContainerName":"liveness-probe","ResourceName":"memory","AllocationInfo":"\n\nCpusetMems\u0018\u0001*\u00010"}]},"Checksum":3996144592}` + +func TestData_UnmarshalCheckpoint(t *testing.T) { + t.Parallel() + + entries := make([]PodResourcesEntry, 0) + + data := New(entries) + err := data.UnmarshalCheckpoint([]byte(testCheckpointData)) + assert.NoError(t, err) + + bytes, err := data.MarshalCheckpoint() + assert.NoError(t, err) + assert.Equal(t, []byte(testCheckpointData), bytes) + + err = data.VerifyChecksum() + assert.NoError(t, err) + + entries = data.GetData() + assert.Equal(t, 21, len(entries)) +} diff --git a/pkg/agent/resourcemanager/outofband/checkpoint_test.go b/pkg/agent/resourcemanager/outofband/checkpoint_test.go new file mode 100644 index 000000000..329dc9af3 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/checkpoint_test.go @@ -0,0 +1,60 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package outofband + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/endpoint" +) + +func TestCheckpoint(t *testing.T) { + t.Parallel() + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp") + assert.NoError(t, err) + + m := &ManagerImpl{ + socketdir: "/tmp", + endpoints: make(map[string]endpoint.EndpointInfo), + podResources: newPodResourcesChk(), + checkpointManager: checkpointManager, + } + defer func() { + _ = os.Remove("/tmp/kubelet_qrm_checkpoint") + }() + + file := m.checkpointFile() + assert.Equal(t, file, "/tmp/kubelet_qrm_checkpoint") + + allocationInfo := generateResourceAllocationInfo() + m.podResources.insert("testPod", "testContainer", "cpu", allocationInfo) + + err = m.writeCheckpoint() + assert.NoError(t, err) + + m.podResources = newPodResourcesChk() + err = m.readCheckpoint() + assert.NoError(t, err) + podSet := m.podResources.pods() + assert.Equal(t, podSet, sets.NewString("testPod")) +} diff --git a/pkg/agent/resourcemanager/outofband/endpoint/consts.go b/pkg/agent/resourcemanager/outofband/endpoint/consts.go new file mode 100644 index 000000000..af712079d --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/endpoint/consts.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import "time" + +const ( + // errFailedToDialResourcePlugin is the error raised when the resource plugin could not be + // reached on the registered socket + errFailedToDialResourcePlugin = "failed to dial resource plugin:" + // errEndpointStopped indicates that the endpoint has been stopped + errEndpointStopped = "endpoint %v has been stopped" +) + +// endpointStopGracePeriod indicates the grace period after an endpoint is stopped +// because its resource plugin fails. QoSResourceManager keeps the stopped endpoint in its +// cache during this grace period to cover the time gap for the capacity change to +// take effect. +const endpointStopGracePeriod = time.Duration(5) * time.Minute diff --git a/pkg/agent/resourcemanager/outofband/endpoint/endpoint.go b/pkg/agent/resourcemanager/outofband/endpoint/endpoint.go new file mode 100644 index 000000000..59ca06e26 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/endpoint/endpoint.go @@ -0,0 +1,169 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "google.golang.org/grpc/credentials/insecure" + + "google.golang.org/grpc" + "k8s.io/klog/v2" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" +) + +// endpoint maps to a single registered resource plugin. It is responsible +// for managing gRPC communications with the resource plugin and caching +// resource states reported by the resource plugin. +type Endpoint interface { + Stop() + Allocate(c context.Context, resourceRequest *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) + GetResourceAllocation(c context.Context, request *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) + RemovePod(c context.Context, removePodRequest *pluginapi.RemovePodRequest) (*pluginapi.RemovePodResponse, error) + IsStopped() bool + StopGracePeriodExpired() bool + GetResourcePluginOptions(ctx context.Context, in *pluginapi.Empty, opts ...grpc.CallOption) (*pluginapi.ResourcePluginOptions, error) +} + +type EndpointInfo struct { + E Endpoint + Opts *pluginapi.ResourcePluginOptions +} + +type EndpointImpl struct { + client pluginapi.ResourcePluginClient + clientConn *grpc.ClientConn + + socketPath string + resourceName string + stopTime time.Time + + mutex sync.Mutex +} + +// NewEndpointImpl creates a new endpoint for the given resourceName. +// This is to be used during normal resource plugin registration. +func NewEndpointImpl(socketPath, resourceName string) (*EndpointImpl, error) { + client, c, err := dial(socketPath) + if err != nil { + klog.Errorf("[qosresourcemanager] Can't create new endpoint with path %s err %v", socketPath, err) + return nil, err + } + + return &EndpointImpl{ + client: client, + clientConn: c, + + socketPath: socketPath, + resourceName: resourceName, + }, nil +} + +func (e *EndpointImpl) Client() pluginapi.ResourcePluginClient { + return e.client +} + +// newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set. +// This is to be used during Kubelet restart, before the actual resource plugin re-registers. +func NewStoppedEndpointImpl(resourceName string) *EndpointImpl { + return &EndpointImpl{ + resourceName: resourceName, + stopTime: time.Now(), + } +} + +func (e *EndpointImpl) IsStopped() bool { + e.mutex.Lock() + defer e.mutex.Unlock() + return !e.stopTime.IsZero() +} + +func (e *EndpointImpl) StopGracePeriodExpired() bool { + e.mutex.Lock() + defer e.mutex.Unlock() + return !e.stopTime.IsZero() && time.Since(e.stopTime) > endpointStopGracePeriod +} + +// used for testing only +func (e *EndpointImpl) setStopTime(t time.Time) { + e.mutex.Lock() + defer e.mutex.Unlock() + e.stopTime = t +} + +// allocate issues Allocate gRPC call to the resource plugin. +func (e *EndpointImpl) Allocate(c context.Context, resourceRequest *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) { + if e.IsStopped() { + return nil, fmt.Errorf(errEndpointStopped, e) + } + ctx, cancel := context.WithTimeout(c, pluginapi.KubeletResourcePluginAllocateRPCTimeoutInSecs*time.Second) + defer cancel() + return e.client.Allocate(ctx, resourceRequest) +} + +func (e *EndpointImpl) Stop() { + e.mutex.Lock() + defer e.mutex.Unlock() + if e.clientConn != nil { + e.clientConn.Close() + } + e.stopTime = time.Now() +} + +func (e *EndpointImpl) GetResourceAllocation(c context.Context, request *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) { + if e.IsStopped() { + return nil, fmt.Errorf(errEndpointStopped, e) + } + ctx, cancel := context.WithTimeout(c, pluginapi.KubeletResourcePluginGetResourcesAllocationRPCTimeoutInSecs*time.Second) + defer cancel() + return e.client.GetResourcesAllocation(ctx, request) +} + +func (e *EndpointImpl) RemovePod(c context.Context, removePodRequest *pluginapi.RemovePodRequest) (*pluginapi.RemovePodResponse, error) { + if e.IsStopped() { + return nil, fmt.Errorf(errEndpointStopped, e) + } + ctx, cancel := context.WithTimeout(c, pluginapi.KubeletResourcePluginRemovePodRPCTimeoutInSecs*time.Second) + defer cancel() + return e.client.RemovePod(ctx, removePodRequest) +} + +func (e *EndpointImpl) GetResourcePluginOptions(ctx context.Context, in *pluginapi.Empty, opts ...grpc.CallOption) (*pluginapi.ResourcePluginOptions, error) { + return e.client.GetResourcePluginOptions(ctx, in, opts...) +} + +// dial establishes the gRPC communication with the registered resource plugin. https://godoc.org/google.golang.org/grpc#Dial +func dial(unixSocketPath string) (pluginapi.ResourcePluginClient, *grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), + grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", addr) + }), + ) + + if err != nil { + return nil, nil, fmt.Errorf(errFailedToDialResourcePlugin+" %v", err) + } + + return pluginapi.NewResourcePluginClient(c), c, nil +} diff --git a/pkg/agent/resourcemanager/outofband/endpoint/endpoint_test.go b/pkg/agent/resourcemanager/outofband/endpoint/endpoint_test.go new file mode 100644 index 000000000..0615497a0 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/endpoint/endpoint_test.go @@ -0,0 +1,196 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "context" + "path" + "testing" + + "github.com/stretchr/testify/require" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" +) + +var ( + eSocketName = "mock.sock" +) + +func TestNewEndpoint(t *testing.T) { + t.Parallel() + + socket := path.Join("/tmp", "TestNewEndpoint"+eSocketName) + + p, e := eSetup(t, socket, "mock") + defer eCleanup(t, p, e) +} + +func TestAllocate(t *testing.T) { + t.Parallel() + + socket := path.Join("/tmp", "TestAllocate"+eSocketName) + p, e := eSetup(t, socket, "mock") + defer eCleanup(t, p, e) + + req := generateResourceRequest() + resp := generateResourceResponse() + + p.SetAllocFunc(func(r *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) { + return resp, nil + }) + + respOut, err := e.Allocate(context.TODO(), req) + require.NoError(t, err) + require.Equal(t, resp, respOut) +} + +func TestNewStoppedEndpointImpl(t *testing.T) { + t.Parallel() + + ei := NewStoppedEndpointImpl("cpu") + require.NotNil(t, ei) +} + +func TestRemovePod(t *testing.T) { + t.Parallel() + + socket := path.Join("/tmp", "TestRemovePod"+eSocketName) + p, e := eSetup(t, socket, "mock") + defer eCleanup(t, p, e) + + req := generateRemovePodRequest() + resp, err := e.RemovePod(context.TODO(), req) + require.NoError(t, err) + require.NotNil(t, resp) +} + +func TestGetResourceAllocation(t *testing.T) { + t.Parallel() + + socket := path.Join("/tmp", "TestGetResourceAllocation"+eSocketName) + p, e := eSetup(t, socket, "mock") + defer eCleanup(t, p, e) + + resp := generateGetResourceAllocationInfoResponse() + p.SetGetAllocFunc(func(r *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) { + return resp, nil + }) + + actual, err := e.GetResourceAllocation(context.TODO(), &pluginapi.GetResourcesAllocationRequest{}) + require.NoError(t, err) + require.Equal(t, actual, resp) +} + +func TestClient(t *testing.T) { + t.Parallel() + + socket := path.Join("/tmp", "TestClient"+eSocketName) + p, e := eSetup(t, socket, "mock") + defer eCleanup(t, p, e) + + client := e.Client() + require.NotNil(t, client) +} + +func generateResourceRequest() *pluginapi.ResourceRequest { + return &pluginapi.ResourceRequest{ + PodUid: "mock_pod", + PodNamespace: "mock_pod_ns", + PodName: "mock_pod_name", + ContainerName: "mock_con_name", + //IsInitContainer: false, + PodRole: "mock_role", + PodType: "mock_type", + ResourceName: "mock_res", + Hint: &pluginapi.TopologyHint{ + Nodes: []uint64{0, 1}, + Preferred: true, + }, + ResourceRequests: map[string]float64{ + "mock_res": 2, + }, + } +} + +func generateResourceResponse() *pluginapi.ResourceAllocationResponse { + return &pluginapi.ResourceAllocationResponse{ + PodUid: "mock_pod", + PodNamespace: "mock_pod_ns", + PodName: "mock_pod_name", + ContainerName: "mock_con_name", + //IsInitContainer: false, + PodRole: "mock_role", + PodType: "mock_type", + ResourceName: "mock_res", + AllocationResult: &pluginapi.ResourceAllocation{ + ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ + "mock_res": generateResourceAllocationInfo(), + }, + }, + } +} + +func generateResourceAllocationInfo() *pluginapi.ResourceAllocationInfo { + return &pluginapi.ResourceAllocationInfo{ + OciPropertyName: "CpusetCpus", + IsNodeResource: true, + IsScalarResource: true, + AllocatedQuantity: 3, + AllocationResult: "5-6,10", + Envs: map[string]string{"mock_key": "mock_env"}, + Annotations: map[string]string{"mock_key": "mock_ano"}, + ResourceHints: &pluginapi.ListOfTopologyHints{}, + } +} + +func generateRemovePodRequest() *pluginapi.RemovePodRequest { + return &pluginapi.RemovePodRequest{ + PodUid: "mock_pod", + } +} + +func generateGetResourceAllocationInfoResponse() *pluginapi.GetResourcesAllocationResponse { + return &pluginapi.GetResourcesAllocationResponse{ + PodResources: map[string]*pluginapi.ContainerResources{ + "mock_pod": { + ContainerResources: map[string]*pluginapi.ResourceAllocation{ + "mock_container": { + ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ + "mock_res": generateResourceAllocationInfo(), + }, + }, + }, + }, + }, + } +} + +func eSetup(t *testing.T, socket, resourceName string) (*Stub, *EndpointImpl) { + p := NewResourcePluginStub(socket, resourceName, false) + + err := p.Start() + require.NoError(t, err) + + e, err := NewEndpointImpl(socket, resourceName) + require.NoError(t, err) + + return p, e +} + +func eCleanup(t *testing.T, p *Stub, e *EndpointImpl) { + p.Stop() + e.Stop() +} diff --git a/pkg/agent/resourcemanager/outofband/endpoint/resource_plugin_stub.go b/pkg/agent/resourcemanager/outofband/endpoint/resource_plugin_stub.go new file mode 100644 index 000000000..782c59f50 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/endpoint/resource_plugin_stub.go @@ -0,0 +1,261 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "context" + "log" + "net" + "os" + "path" + "sync" + "time" + + "google.golang.org/grpc/credentials/insecure" + + "google.golang.org/grpc" + watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" +) + +type Stub struct { + socket string + resourceName string + preStartContainerFlag bool + + stop chan interface{} + wg sync.WaitGroup + + server *grpc.Server + + // allocFunc1 is used for handling allocation request + allocFunc1 stubAllocFunc1 + //handling get allocation request + allocFunc2 stubAllocFunc2 + + registrationStatus chan watcherapi.RegistrationStatus // for testing + endpoint string // for testing +} + +// stubAllocFunc1 is the function called when an allocation request is received from Kubelet +type stubAllocFunc1 func(r *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) + +// stubAllocFYnc2 is the function called when a get allocation request is received form Kubelet +type stubAllocFunc2 func(r *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) + +func defaultAllocFunc(r *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) { + var response pluginapi.ResourceAllocationResponse + + return &response, nil +} +func defaultGetAllocFunc(r *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) { + var response pluginapi.GetResourcesAllocationResponse + return &response, nil +} + +// NewResourcePluginStub returns an initialized ResourcePlugin Stub. +func NewResourcePluginStub(socket string, name string, preStartContainerFlag bool) *Stub { + return &Stub{ + socket: socket, + resourceName: name, + preStartContainerFlag: preStartContainerFlag, + + stop: make(chan interface{}), + + allocFunc1: defaultAllocFunc, + allocFunc2: defaultGetAllocFunc, + } +} + +// SetAllocFunc sets allocFunc of the resource plugin +func (m *Stub) SetAllocFunc(f stubAllocFunc1) { + m.allocFunc1 = f +} +func (m *Stub) SetGetAllocFunc(f stubAllocFunc2) { + m.allocFunc2 = f +} + +// Start starts the gRPC server of the resource plugin. Can only +// be called once. +func (m *Stub) Start() error { + err := m.cleanup() + if err != nil { + return err + } + + sock, err := net.Listen("unix", m.socket) + if err != nil { + return err + } + + m.wg.Add(1) + m.server = grpc.NewServer([]grpc.ServerOption{}...) + pluginapi.RegisterResourcePluginServer(m.server, m) + watcherapi.RegisterRegistrationServer(m.server, m) + + go func() { + defer func() { + m.wg.Done() + + if err := recover(); err != nil { + log.Fatalf("Start recover from err: %v", err) + } + }() + m.server.Serve(sock) + }() + _, conn, err := dial(m.socket) + if err != nil { + return err + } + conn.Close() + log.Printf("Starting to serve on %v", m.socket) + + return nil +} + +// Stop stops the gRPC server. Can be called without a prior Start +// and more than once. Not safe to be called concurrently by different +// goroutines! +func (m *Stub) Stop() error { + if m.server == nil { + return nil + } + m.server.Stop() + m.wg.Wait() + m.server = nil + close(m.stop) // This prevents re-starting the server. + + return m.cleanup() +} + +// GetInfo is the RPC which return pluginInfo +func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) { + log.Println("GetInfo") + return &watcherapi.PluginInfo{ + Type: watcherapi.ResourcePlugin, + Name: m.resourceName, + Endpoint: m.endpoint, + SupportedVersions: []string{pluginapi.Version}}, nil +} + +// NotifyRegistrationStatus receives the registration notification from watcher +func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi.RegistrationStatus) (*watcherapi.RegistrationStatusResponse, error) { + if m.registrationStatus != nil { + m.registrationStatus <- *status + } + if !status.PluginRegistered { + log.Printf("Registration failed: %v", status.Error) + } + return &watcherapi.RegistrationStatusResponse{}, nil +} + +// Register registers the resource plugin for the given resourceName with Kubelet. +func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error { + if pluginSockDir != "" { + if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil { + log.Println("Deprecation file found. Skip registration.") + return nil + } + } + log.Println("Deprecation file not found. Invoke registration") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn, err := grpc.DialContext(ctx, kubeletEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), + grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", addr) + })) + if err != nil { + return err + } + defer conn.Close() + client := pluginapi.NewRegistrationClient(conn) + reqt := &pluginapi.RegisterRequest{ + Version: pluginapi.Version, + Endpoint: path.Base(m.socket), + ResourceName: resourceName, + Options: &pluginapi.ResourcePluginOptions{ + PreStartRequired: m.preStartContainerFlag, + }, + } + + _, err = client.Register(context.Background(), reqt) + if err != nil { + return err + } + return nil +} + +// GetResourcePluginOptions returns ResourcePluginOptions settings for the resource plugin. +func (m *Stub) GetResourcePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.ResourcePluginOptions, error) { + options := &pluginapi.ResourcePluginOptions{ + PreStartRequired: m.preStartContainerFlag, + } + return options, nil +} + +// PreStartContainer resets the resources received +func (m *Stub) PreStartContainer(ctx context.Context, r *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + log.Printf("PreStartContainer, %+v", r) + return &pluginapi.PreStartContainerResponse{}, nil +} + +// Allocate does a mock allocation +func (m *Stub) Allocate(ctx context.Context, r *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) { + log.Printf("Allocate, %+v", r) + + return m.allocFunc1(r) +} + +func (m *Stub) cleanup() error { + if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) { + return err + } + + return nil +} + +// GetResourcesAllocation returns allocation results of corresponding resources +func (m *Stub) GetResourcesAllocation(ctx context.Context, r *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) { + log.Printf("GetResourcesAllocation, %+v", r) + return m.allocFunc2(r) +} + +// GetTopologyAwareResources returns allocation results of corresponding resources as topology aware format +func (m *Stub) GetTopologyAwareResources(ctx context.Context, r *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error) { + log.Printf("GetTopologyAwareResources, %+v", r) + return &pluginapi.GetTopologyAwareResourcesResponse{}, nil +} + +// GetTopologyAwareResources returns corresponding allocatable resources as topology aware format +func (m *Stub) GetTopologyAwareAllocatableResources(ctx context.Context, r *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error) { + log.Printf("GetTopologyAwareAllocatableResources, %+v", r) + return &pluginapi.GetTopologyAwareAllocatableResourcesResponse{}, nil +} + +// GetTopologyHints returns hints of corresponding resources +func (m *Stub) GetTopologyHints(ctx context.Context, r *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error) { + log.Printf("GetTopologyHints, %+v", r) + return &pluginapi.ResourceHintsResponse{}, nil +} + +// Notify the resource plugin that the pod has beed deleted, +// and the plugin should do some clear-up work. +func (m *Stub) RemovePod(ctx context.Context, r *pluginapi.RemovePodRequest) (*pluginapi.RemovePodResponse, error) { + log.Printf("RemovePod, %+v", r) + return &pluginapi.RemovePodResponse{}, nil +} diff --git a/pkg/agent/resourcemanager/outofband/executor/executor.go b/pkg/agent/resourcemanager/outofband/executor/executor.go new file mode 100644 index 000000000..3e369b94f --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/executor/executor.go @@ -0,0 +1,135 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +type Executor interface { + UpdateContainerResources(pod *v1.Pod, container *v1.Container, resourceAllocation map[string]*v1alpha1.ResourceAllocationInfo) error +} + +type Impl struct { + cgroupManager cgroupmgr.Manager +} + +func NewExecutor(cgroupManager cgroupmgr.Manager) Executor { + return &Impl{ + cgroupManager: cgroupManager, + } +} + +// UpdateContainerResources update container resources by resourceAllocation +func (ei *Impl) UpdateContainerResources(pod *v1.Pod, container *v1.Container, resourceAllocation map[string]*v1alpha1.ResourceAllocationInfo) error { + if pod == nil || container == nil { + klog.Warningf("UpdateContainerResources, pod or container is nil") + return nil + } + if len(resourceAllocation) == 0 { + return fmt.Errorf("empty resourceAllocation for pod: %v, container: %v", pod.Name, container.Name) + } + + var ( + CPUSetData = &common.CPUSetData{} + ) + + for _, resourceAllocationInfo := range resourceAllocation { + switch resourceAllocationInfo.OciPropertyName { + case util.OCIPropertyNameCPUSetCPUs: + if resourceAllocationInfo.AllocationResult != "" { + CPUSetData.CPUs = resourceAllocationInfo.AllocationResult + } + case util.OCIPropertyNameCPUSetMems: + if resourceAllocationInfo.AllocationResult != "" { + CPUSetData.Mems = resourceAllocationInfo.AllocationResult + } + default: + + } + } + + absCgroupPath, err := ei.containerCgroupPath(pod, container) + if err != nil { + klog.Errorf("[ORM] containerCgroupPath fail, pod: %v, container: %v, err: %v", pod.Name, container.Name, err) + return err + } + + err = ei.commitCPUSet(absCgroupPath, CPUSetData) + if err != nil { + klog.Errorf("[ORM] commitCPUSet fail, pod: %v, container: %v, err: %v", pod.Name, container.Name, err) + return err + } + + return nil +} + +// applyCPUSet apply CPUSet data by cgroupManager +func (ei *Impl) applyCPUSet(absCgroupPath string, data *common.CPUSetData) error { + return ei.cgroupManager.ApplyCPUSet(absCgroupPath, data) +} + +// commitCPUSet rollback if any data apply failed in data +// consider if such operation is necessary, for runc does not guarantee the atomicity of cgroup subsystem settings either +// https://github.com/opencontainers/runc/blob/main/libcontainer/cgroups/fs/cpuset.go#L27 +func (ei *Impl) commitCPUSet(absCgroupPath string, data *common.CPUSetData) error { + CPUSetStats, err := ei.cgroupManager.GetCPUSet(absCgroupPath) + if err != nil { + return err + } + + err = ei.applyCPUSet(absCgroupPath, data) + if err != nil { + // rollback + rollbackErr := ei.applyCPUSet(absCgroupPath, &common.CPUSetData{ + CPUs: CPUSetStats.CPUs, + Mems: CPUSetStats.Mems, + }) + if rollbackErr == nil { + err = fmt.Errorf("applyCPUSet fail, CPUSet rollback, err: %v", err) + return err + } else { + err = fmt.Errorf("applyCPUSet fail, rollback fail, err: %v, rollbackErr: %v", err, rollbackErr) + return err + } + } + + return nil +} + +func (ei *Impl) containerCgroupPath(pod *v1.Pod, container *v1.Container) (string, error) { + containerID, err := native.GetContainerID(pod, container.Name) + if err != nil { + return "", err + } + + absCgroupPath, err := common.GetContainerAbsCgroupPath(common.CgroupSubsysCPUSet, string(pod.UID), containerID) + if err != nil { + return "", err + } + + return absCgroupPath, nil +} diff --git a/pkg/agent/resourcemanager/outofband/executor/executor_test.go b/pkg/agent/resourcemanager/outofband/executor/executor_test.go new file mode 100644 index 000000000..a6a16c21f --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/executor/executor_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" +) + +func TestImpl_UpdateContainerResources(t *testing.T) { + t.Parallel() + + impl := NewExecutor(&manager.FakeCgroupManager{}) + + err := impl.UpdateContainerResources(nil, nil, nil) + assert.Nil(t, err) + + err = impl.UpdateContainerResources(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + UID: "testUID", + }, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "testContainer", + ContainerID: "containerd://containerID", + }, + }, + }, + }, + &v1.Container{ + Name: "testContainer", + }, + nil) + assert.NotNil(t, err) + + err = impl.UpdateContainerResources( + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + UID: "testUID", + }, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "testContainer", + ContainerID: "containerd://containerID", + }, + }, + }, + }, + &v1.Container{ + Name: "testContainer", + }, map[string]*v1alpha1.ResourceAllocationInfo{ + "cpu": { + OciPropertyName: "CpusetCpus", + AllocationResult: "0-3", + }, + "memory": { + OciPropertyName: "CpusetMems", + AllocationResult: "0,1", + }, + }) + assert.NotNil(t, err) +} + +func TestCommitCPUSet(t *testing.T) { + t.Parallel() + + impl := &Impl{ + cgroupManager: &manager.FakeCgroupManager{}, + } + err := impl.commitCPUSet("testPath", &common.CPUSetData{ + CPUs: "0-3", + Mems: "0,1", + }) + assert.Nil(t, err) +} diff --git a/pkg/agent/resourcemanager/outofband/manager.go b/pkg/agent/resourcemanager/outofband/manager.go new file mode 100644 index 000000000..9b2c85b68 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/manager.go @@ -0,0 +1,580 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package outofband + +import ( + "context" + "fmt" + "net" + "os" + "path/filepath" + "sync" + "time" + + "github.com/opencontainers/selinux/go-selinux" + "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "k8s.io/kubelet/pkg/apis/pluginregistration/v1" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + maputil "k8s.io/kubernetes/pkg/util/maps" + + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/endpoint" + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/executor" + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/metamanager" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +type ManagerImpl struct { + ctx context.Context + + socketname string + socketdir string + + // resource to QRMPlugins and executors + mutex sync.RWMutex + endpoints map[string]endpoint.EndpointInfo + resourceExecutor executor.Executor + + metaManager *metamanager.Manager + + server *grpc.Server + wg sync.WaitGroup + + podAddChan chan string + podDeleteChan chan string + + podResources *podResourcesChk + checkpointManager checkpointmanager.CheckpointManager + + emitter metrics.MetricEmitter + qosConfig *generic.QoSConfiguration + + reconcilePeriod time.Duration + resourceNamesMap map[string]string +} + +func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, config *config.Configuration) (*ManagerImpl, error) { + klog.V(2).Infof("new ORM..., socketPath: %v, resourceNameMap: %v, reconcilePeriod: %v", socketPath, config.ORMResourceNamesMap, config.ORMRconcilePeriod) + + if socketPath == "" || !filepath.IsAbs(socketPath) { + return nil, fmt.Errorf(errBadSocket+" %s", socketPath) + } + dir, file := filepath.Split(socketPath) + + checkpointManager, err := checkpointmanager.NewCheckpointManager(dir) + if err != nil { + return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) + } + + m := &ManagerImpl{ + socketdir: dir, + socketname: file, + + endpoints: make(map[string]endpoint.EndpointInfo), + podResources: newPodResourcesChk(), + checkpointManager: checkpointManager, + + resourceNamesMap: config.ORMResourceNamesMap, + reconcilePeriod: config.ORMRconcilePeriod, + + podAddChan: make(chan string, config.ORMPodNotifyChanLen), + podDeleteChan: make(chan string, config.ORMPodNotifyChanLen), + emitter: emitter, + qosConfig: config.QoSConfiguration, + } + + m.resourceExecutor = executor.NewExecutor(cgroupmgr.GetManager()) + + metaManager := metamanager.NewManager(emitter, m.podResources.pods, metaServer) + m.metaManager = metaManager + + if err := m.removeContents(m.socketdir); err != nil { + err = fmt.Errorf("[ORM] Fail to clean up stale contents under %s: %v", m.socketdir, err) + klog.Error(err) + return nil, err + } + klog.V(5).Infof("removeContents......") + + return m, nil +} + +func (m *ManagerImpl) Run(ctx context.Context) { + klog.V(2).Infof("[ORM] running...") + m.ctx = ctx + + // read data from checkpoint + err := m.readCheckpoint() + if err != nil { + klog.Fatalf("[ORM] read checkpoint fail: %v", err) + } + + if err = os.MkdirAll(m.socketdir, 0750); err != nil { + klog.Fatalf("[ORM] Mkdir socketdir %v fail: %v", m.socketdir, err) + } + if selinux.GetEnabled() { + if err := selinux.SetFileLabel(m.socketdir, KubeletPluginsDirSELinuxLabel); err != nil { + klog.Warningf("[ORM] Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", m.socketdir, err) + } + } + + socketPath := filepath.Join(m.socketdir, m.socketname) + s, err := net.Listen("unix", socketPath) + if err != nil { + klog.Fatalf(errListenSocket+" %v", err) + } + + m.wg.Add(1) + m.server = grpc.NewServer([]grpc.ServerOption{}...) + + pluginapi.RegisterRegistrationServer(m.server, m) + + klog.V(2).Infof("[ORM] Serving resource plugin registration server on %q", socketPath) + go func() { + defer func() { + m.wg.Done() + + if err := recover(); err != nil { + klog.Fatalf("[ORM] Start recover from err: %v", err) + } + }() + m.server.Serve(s) + }() + + klog.V(5).Infof("[ORM] start serve socketPath %v", socketPath) + go func() { + m.process() + }() + + go wait.Until(m.reconcile, m.reconcilePeriod, m.ctx.Done()) + + m.metaManager.RegistPodAddedFunc(m.onPodAdd) + m.metaManager.RegistPodDeletedFunc(m.onPodDelete) + + m.metaManager.Run(ctx, m.reconcilePeriod) +} + +func (m *ManagerImpl) GetHandlerType() string { + return pluginregistration.ResourcePlugin +} + +func (m *ManagerImpl) onPodAdd(podUID string) { + klog.V(5).Infof("[ORM] onPodAdd: %v", podUID) + + timeout, cancel := context.WithTimeout(m.ctx, 1*time.Second) + defer cancel() + + select { + case m.podAddChan <- podUID: + + case <-timeout.Done(): + klog.Errorf("[ORM] add pod timeout: %v", podUID) + _ = m.emitter.StoreInt64(MetricAddPodTimeout, 1, metrics.MetricTypeNameRaw) + } +} + +func (m *ManagerImpl) onPodDelete(podUID string) { + klog.V(5).Infof("[ORM] onPodDelete: %v", podUID) + + timeout, cancel := context.WithTimeout(m.ctx, 1*time.Second) + defer cancel() + + select { + case m.podDeleteChan <- podUID: + + case <-timeout.Done(): + klog.Errorf("[ORM] delete pod timeout: %v", podUID) + _ = m.emitter.StoreInt64(MetricDeletePodTImeout, 1, metrics.MetricTypeNameRaw) + } +} + +func (m *ManagerImpl) process() { + klog.Infof("[ORM] start process...") + + for { + select { + case podUID := <-m.podAddChan: + err := m.processAddPod(podUID) + if err != nil { + klog.Errorf("[ORM] processAddPod fail, podUID: %v, err: %v", podUID, err) + } + + case podUID := <-m.podDeleteChan: + err := m.processDeletePod(podUID) + if err != nil { + klog.Errorf("[ORM] processDeletePod fail, podUID: %v, err: %v", podUID, err) + } + + case <-m.ctx.Done(): + klog.Infof("[ORM] ctx done, exit") + return + } + } +} + +func (m *ManagerImpl) processAddPod(podUID string) error { + pod, err := m.metaManager.MetaServer.GetPod(m.ctx, podUID) + if err != nil { + klog.Errorf("[ORM] processAddPod getPod fail, podUID: %v, err: %v", podUID, err) + return err + } + + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + err = m.addContainer(pod, &container) + if err != nil { + klog.Errorf("[ORM] add container fail, pod: %v, container: %v, err: %v", pod.Name, container.Name, err) + return err + } + + _ = m.syncContainer(pod, &container) + } + + return nil +} + +func (m *ManagerImpl) processDeletePod(podUID string) error { + allSuccess := true + + m.mutex.Lock() + for resourceName, endpoint := range m.endpoints { + _, err := endpoint.E.RemovePod(m.ctx, &pluginapi.RemovePodRequest{ + PodUid: podUID, + }) + + if err != nil { + allSuccess = false + klog.Errorf("[ORM] plugin %v remove pod %v fail: %v", resourceName, podUID, err) + } + } + m.mutex.Unlock() + + if allSuccess { + m.podResources.deletePod(podUID) + } + + return m.writeCheckpoint() +} + +func (m *ManagerImpl) addContainer(pod *v1.Pod, container *v1.Container) error { + klog.V(5).Infof("[ORM] addContainer, pod: %v, container: %v", pod.Name, container.Name) + + systemCores, err := isPodKatalystQoSLevelSystemCores(m.qosConfig, pod) + if err != nil { + klog.Errorf("[ORM] check pod %s qos level fail: %v", pod.Name, err) + return err + } + + if native.CheckDaemonPod(pod) && !systemCores { + klog.Infof("[ORM] skip pod: %s/%s, container: %s resource allocation", + pod.Namespace, pod.Name, container.Name) + return nil + } + + for k, v := range container.Resources.Requests { + resource, err := m.getMappedResourceName(string(k), container.Resources.Requests) + if err != nil { + klog.Errorf("resource %s getMappedResourceName fail: %v", string(k), err) + return err + } + + m.mutex.Lock() + e, ok := m.endpoints[resource] + m.mutex.Unlock() + if !ok { + klog.V(5).Infof("[ORM] addContainer resource %s not supported", resource) + continue + } + + containerType, containerIndex, err := GetContainerTypeAndIndex(pod, container) + if err != nil { + return err + } + + resourceReq := &pluginapi.ResourceRequest{ + PodUid: string(pod.UID), + PodNamespace: pod.GetNamespace(), + PodName: pod.GetName(), + ContainerName: container.Name, + ContainerType: containerType, + ContainerIndex: containerIndex, + // PodRole and PodType should be identified by more general annotations + PodRole: pod.Labels[pluginapi.PodRoleLabelKey], + PodType: pod.Annotations[pluginapi.PodTypeAnnotationKey], + // use mapped resource name in "ResourceName" to indicates which endpoint to request + ResourceName: resource, + // use original requested resource name in "ResourceRequests" in order to make plugin identity real requested resource name + ResourceRequests: map[string]float64{resource: v.AsApproximateFloat64()}, + Labels: maputil.CopySS(pod.Labels), + Annotations: maputil.CopySS(pod.Annotations), + // hint is not used in ORM but it can not be nil + Hint: &pluginapi.TopologyHint{}, + } + + response, err := e.E.Allocate(m.ctx, resourceReq) + if err != nil { + err = fmt.Errorf("[ORM] addContainer allocate fail, pod %v, container %v, err: %v", pod.Name, container.Name, err) + klog.Error(err) + return err + } + + if response.AllocationResult == nil { + klog.Warningf("[ORM] allocate for pod %v container %v resource %v got nil allocation result", pod.Name, container.Name, resource) + continue + } + + // update + m.UpdatePodResources(response.AllocationResult.ResourceAllocation, pod, container, resource) + } + + // write checkpoint + return m.writeCheckpoint() +} + +func (m *ManagerImpl) syncContainer(pod *v1.Pod, container *v1.Container) error { + klog.Infof("[ORM] syncContainer, pod: %v, container: %v", pod.Name, container.Name) + containerAllResources := m.podResources.containerAllResources(string(pod.UID), container.Name) + if containerAllResources == nil { + klog.V(5).Infof("got pod %v container %v resources nil", pod.Name, container.Name) + return nil + } + + err := m.resourceExecutor.UpdateContainerResources(pod, container, containerAllResources) + if err != nil { + klog.Errorf("[ORM] UpdateContainerResources fail, pod: %v, container: %v, err: %v", pod.Name, container.Name, err) + return err + } + + return nil +} + +func (m *ManagerImpl) reconcile() { + klog.V(5).Infof("[ORM] reconcile...") + resourceAllocationResps := make(map[string]*pluginapi.GetResourcesAllocationResponse) + activePods, err := m.metaManager.MetaServer.GetPodList(m.ctx, native.PodIsActive) + if err != nil { + klog.Errorf("[ORM] getPodList fail: %v", err) + return + } + + m.mutex.Lock() + for resourceName, e := range m.endpoints { + if e.E.IsStopped() { + klog.Warningf("[ORM] skip getResourceAllocation of resource: %s, because plugin stopped", resourceName) + continue + } else if !e.Opts.NeedReconcile { + klog.V(5).Infof("[ORM] skip getResourceAllocation of resource: %s, because plugin needn't reconciling", resourceName) + continue + } + resp, err := e.E.GetResourceAllocation(m.ctx, &pluginapi.GetResourcesAllocationRequest{}) + if err != nil { + klog.Errorf("[ORM] plugin %s getResourcesAllocation fail: %v", resourceName, err) + continue + } + + resourceAllocationResps[resourceName] = resp + } + m.mutex.Unlock() + + for _, pod := range activePods { + if pod == nil { + continue + } + systemCores, err := isPodKatalystQoSLevelSystemCores(m.qosConfig, pod) + if err != nil { + klog.Errorf("[ORM] check pod %s qos level fail: %v", pod.Name, err) + } + + if native.CheckDaemonPod(pod) && !systemCores { + continue + } + for _, container := range pod.Spec.Containers { + + needsReAllocate := false + for resourceName, resp := range resourceAllocationResps { + if resp == nil { + klog.Warningf("[ORM] resource: %s got nil resourceAllocationResp", resourceName) + continue + } + + isRequested, err := m.IsContainerRequestResource(&container, resourceName) + if err != nil { + klog.Errorf("[ORM] IsContainerRequestResource fail, container %v, resourceName %v, err: %v", container.Name, resourceName, err) + continue + } + + if isRequested { + if resp.PodResources[string(pod.UID)] != nil && resp.PodResources[string(pod.UID)].ContainerResources[container.Name] != nil { + resourceAllocations := resp.PodResources[string(pod.UID)].ContainerResources[container.Name] + m.UpdatePodResources(resourceAllocations.ResourceAllocation, pod, &container, resourceName) + } else { + needsReAllocate = true + m.podResources.deleteResourceAllocationInfo(string(pod.UID), container.Name, resourceName) + } + } + } + if needsReAllocate && !isSkippedContainer(pod, &container) { + klog.Infof("[ORM] needs re-allocate resource plugin resources for pod %s/%s, container %s during reconcileState", + pod.Namespace, pod.Name, container.Name) + err = m.addContainer(pod, &container) + if err != nil { + klog.Errorf("[ORM] re addContainer fail, pod %v container %v, err: %v", pod.Name, container.Name, err) + continue + } + } + + _ = m.syncContainer(pod, &container) + } + } + + err = m.writeCheckpoint() + if err != nil { + klog.Errorf("[ORM] writeCheckpoint: %v", err) + } +} + +func (m *ManagerImpl) UpdatePodResources( + resourceAllocation map[string]*pluginapi.ResourceAllocationInfo, + pod *v1.Pod, container *v1.Container, resource string) { + for accResourceName, allocationInfo := range resourceAllocation { + if allocationInfo == nil { + klog.Warningf("[ORM] allocation request for resources %s - accompanying resource: %s for pod: %s/%s, container: %s got nil allocation information", + resource, accResourceName, pod.Namespace, pod.Name, container.Name) + continue + } + + klog.V(4).Infof("[ORM] allocation information for resources %s - accompanying resource: %s for pod: %s/%s, container: %s is %v", + resource, accResourceName, pod.Namespace, pod.Name, container.Name, *allocationInfo) + + m.podResources.insert(string(pod.UID), container.Name, accResourceName, allocationInfo) + } +} + +// getMappedResourceName returns mapped resource name of input "resourceName" in m.resourceNamesMap if there is the mapping entry, +// or it will return input "resourceName". +// If both the input "resourceName" and the mapped resource name are requested, it will return error. +func (m *ManagerImpl) getMappedResourceName(resourceName string, requests v1.ResourceList) (string, error) { + if _, found := m.resourceNamesMap[resourceName]; !found { + return resourceName, nil + } + + mappedResourceName := m.resourceNamesMap[resourceName] + + _, foundReq := requests[v1.ResourceName(resourceName)] + _, foundMappedReq := requests[v1.ResourceName(mappedResourceName)] + + if foundReq && foundMappedReq { + return mappedResourceName, fmt.Errorf("both %s and mapped %s are requested", resourceName, mappedResourceName) + } + + klog.V(5).Infof("[ORM] map resource name: %s to %s", resourceName, mappedResourceName) + + return mappedResourceName, nil +} + +func (m *ManagerImpl) IsContainerRequestResource(container *v1.Container, resourceName string) (bool, error) { + if container == nil { + return false, nil + } + + for k := range container.Resources.Requests { + requestedResourceName, err := m.getMappedResourceName(string(k), container.Resources.Requests) + + if err != nil { + return false, err + } + + if requestedResourceName == resourceName { + return true, nil + } + } + + return false, nil +} + +func GetContainerTypeAndIndex(pod *v1.Pod, container *v1.Container) (containerType pluginapi.ContainerType, containerIndex uint64, err error) { + if pod == nil || container == nil { + err = fmt.Errorf("got nil pod: %v or container: %v", pod, container) + return + } + + foundContainer := false + + for i, initContainer := range pod.Spec.InitContainers { + if container.Name == initContainer.Name { + foundContainer = true + containerType = pluginapi.ContainerType_INIT + containerIndex = uint64(i) + break + } + } + + if !foundContainer { + mainContainerName := pod.Annotations[MainContainerNameAnnotationKey] + + if mainContainerName == "" && len(pod.Spec.Containers) > 0 { + mainContainerName = pod.Spec.Containers[0].Name + } + + for i, appContainer := range pod.Spec.Containers { + if container.Name == appContainer.Name { + foundContainer = true + + if container.Name == mainContainerName { + containerType = pluginapi.ContainerType_MAIN + } else { + containerType = pluginapi.ContainerType_SIDECAR + } + + containerIndex = uint64(i) + break + } + } + } + + if !foundContainer { + err = fmt.Errorf("GetContainerTypeAndIndex doesn't find container: %s in pod: %s/%s", container.Name, pod.Namespace, pod.Name) + } + + return +} + +func isSkippedContainer(pod *v1.Pod, container *v1.Container) bool { + containerType, _, err := GetContainerTypeAndIndex(pod, container) + + if err != nil { + klog.Errorf("GetContainerTypeAndIndex failed with error: %v", err) + return false + } + + return containerType == pluginapi.ContainerType_INIT +} + +func isPodKatalystQoSLevelSystemCores(qosConfig *generic.QoSConfiguration, pod *v1.Pod) (bool, error) { + qosLevel, err := qosConfig.GetQoSLevelForPod(pod) + if err != nil { + return false, err + } + + return qosLevel == pluginapi.KatalystQoSLevelSystemCores, nil +} diff --git a/pkg/agent/resourcemanager/outofband/manager_test.go b/pkg/agent/resourcemanager/outofband/manager_test.go new file mode 100644 index 000000000..dbd4c1ac3 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/manager_test.go @@ -0,0 +1,588 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package outofband + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/uuid" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/endpoint" + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/executor" + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/metamanager" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metrics" + cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" +) + +func TestProcess(t *testing.T) { + t.Parallel() + + res1 := TestResource{ + resourceName: "domain1.com/resource1", + resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), + } + res2 := TestResource{ + resourceName: "domain2.com/resource2", + resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), + } + + testResources := []TestResource{ + res1, + res2, + } + + pods := []*v1.Pod{ + makePod("testPod", v1.ResourceList{ + "domain1.com/resource1": *resource.NewQuantity(2, resource.DecimalSI), + "domain2.com/resource2": *resource.NewQuantity(2, resource.DecimalSI), + }), + makePod("skipPod", v1.ResourceList{ + "domain1.com/resource1": *resource.NewQuantity(2, resource.DecimalSI), + "domain2.com/resource2": *resource.NewQuantity(2, resource.DecimalSI), + }), + } + pods[1].OwnerReferences = []metav1.OwnerReference{ + { + Kind: "DaemonSet", + }, + } + + ckDir, err := ioutil.TempDir("", "checkpoint-Test") + assert.NoError(t, err) + defer func() { _ = os.RemoveAll(ckDir) }() + + conf := generateTestConfiguration(ckDir) + metaServer, err := generateTestMetaServer(conf, pods) + assert.NoError(t, err) + metamanager := metamanager.NewManager(metrics.DummyMetrics{}, nil, metaServer) + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := &ManagerImpl{ + ctx: ctx, + endpoints: map[string]endpoint.EndpointInfo{}, + socketdir: "/tmp/process", + metaManager: metamanager, + resourceNamesMap: map[string]string{}, + podResources: newPodResourcesChk(), + resourceExecutor: executor.NewExecutor(&cgroupmgr.FakeCgroupManager{}), + checkpointManager: checkpointManager, + podAddChan: make(chan string, 1), + podDeleteChan: make(chan string, 1), + qosConfig: generic.NewQoSConfiguration(), + } + defer func() { _ = os.Remove("/tmp/process/kubelet_qrm_checkpoint") }() + + err = registerEndpointByRes(m, testResources) + assert.NoError(t, err) + + go m.process() + + for _, pod := range pods { + m.onPodAdd(string(pod.UID)) + } + + time.Sleep(1 * time.Second) + containerResources := m.podResources.podResources(string(pods[0].UID)) + assert.NotNil(t, containerResources) + assert.Equal(t, len(containerResources), 1) + + for _, containerResource := range containerResources { + assert.Equal(t, len(containerResource), 2) + allocationInfo, ok := containerResource["domain1.com/resource1"] + assert.True(t, ok) + assert.Equal(t, allocationInfo.OciPropertyName, "CpusetCpus") + + allocationInfo, ok = containerResource["domain2.com/resource2"] + assert.True(t, ok) + assert.Equal(t, allocationInfo.OciPropertyName, "CpusetMems") + } + + containerResources = m.podResources.podResources(string(pods[1].UID)) + assert.Nil(t, containerResources) + + // remove pod + for _, pod := range pods { + m.onPodDelete(string(pod.UID)) + } + time.Sleep(500 * time.Millisecond) + assert.Equal(t, len(m.podResources.allAllocatedResourceNames()), 0) + assert.Equal(t, len(m.podResources.pods()), 0) +} + +func TestReconcile(t *testing.T) { + t.Parallel() + + pods := []*v1.Pod{ + makePod("testPod1", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + makePod("testPod2", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + } + + ckDir, err := ioutil.TempDir("", "checkpoint-Test") + assert.NoError(t, err) + defer func() { _ = os.RemoveAll(ckDir) }() + + conf := generateTestConfiguration(ckDir) + metaServer, err := generateTestMetaServer(conf, pods) + assert.NoError(t, err) + metamanager := metamanager.NewManager(metrics.DummyMetrics{}, nil, metaServer) + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/reconcile") + assert.NoError(t, err) + + m := &ManagerImpl{ + endpoints: map[string]endpoint.EndpointInfo{}, + socketdir: "/tmp/reconcile", + metaManager: metamanager, + resourceNamesMap: map[string]string{ + "domain1.com/resource1": "domain1.com/resource1", + }, + podResources: newPodResourcesChk(), + resourceExecutor: executor.NewExecutor(&cgroupmgr.FakeCgroupManager{}), + checkpointManager: checkpointManager, + podAddChan: make(chan string, 1), + podDeleteChan: make(chan string, 1), + qosConfig: generic.NewQoSConfiguration(), + } + defer func() { _ = os.Remove("/tmp/reconcile/kubelet_qrm_checkpoint") }() + + err = registerEndpointByPods(m, pods) + assert.NoError(t, err) + + m.reconcile() + + assert.Equal(t, len(m.podResources.pods()), 2) + for _, pod := range pods { + containerResources := m.podResources.podResources(string(pod.UID)) + assert.NotNil(t, containerResources) + + for _, resourceAllocation := range containerResources { + assert.Equal(t, len(resourceAllocation), 2) + } + } +} + +func TestIsSkippedContainer(t *testing.T) { + t.Parallel() + + testCases := []struct { + Name string + Pod *v1.Pod + Container *v1.Container + Expected bool + }{ + { + Name: "mainContainer", + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + }, + }, + }, + }, + Container: &v1.Container{ + Name: "testContainer", + }, + Expected: false, + }, + { + Name: "initContainer", + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Name: "testContainer", + }, + }, + }}, + Container: &v1.Container{ + Name: "testContainer", + }, + Expected: true, + }, + { + Name: "fail", + Pod: nil, + Container: nil, + Expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + res := isSkippedContainer(tc.Pod, tc.Container) + assert.Equal(t, res, tc.Expected) + }) + } +} + +func TestGetMappedResourceName(t *testing.T) { + t.Parallel() + + m := &ManagerImpl{ + resourceNamesMap: map[string]string{ + "test/cpu": "cpu", + "test/memory": "memory", + }, + } + + testCases := []struct { + Name string + resourceName string + requests v1.ResourceList + expectErr bool + expectResource string + }{ + { + Name: "cpu", + resourceName: "test/cpu", + requests: map[v1.ResourceName]resource.Quantity{ + "cpu": *resource.NewQuantity(1, resource.DecimalSI), + }, + expectErr: false, + expectResource: "cpu", + }, + { + Name: "not found", + resourceName: "cpu", + requests: map[v1.ResourceName]resource.Quantity{ + "cpu": *resource.NewQuantity(1, resource.DecimalSI), + }, + expectErr: false, + expectResource: "cpu", + }, + { + Name: "repetition", + resourceName: "test/cpu", + requests: map[v1.ResourceName]resource.Quantity{ + "cpu": *resource.NewQuantity(1, resource.DecimalSI), + "test/cpu": *resource.NewQuantity(1, resource.DecimalSI), + }, + expectErr: true, + expectResource: "cpu", + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + r, err := m.getMappedResourceName(tc.resourceName, tc.requests) + if tc.expectErr { + assert.NotNil(t, err) + } else { + assert.Nil(t, err) + assert.Equal(t, r, tc.expectResource) + } + }) + } +} + +func TestRun(t *testing.T) { + t.Parallel() + + pods := []*v1.Pod{ + makePod("testPod1", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + makePod("testPod2", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + } + + ckDir, err := ioutil.TempDir("", "checkpoint-Test") + assert.NoError(t, err) + defer func() { _ = os.RemoveAll(ckDir) }() + + conf := generateTestConfiguration(ckDir) + metaServer, err := generateTestMetaServer(conf, pods) + assert.NoError(t, err) + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/run") + assert.NoError(t, err) + + m := &ManagerImpl{ + reconcilePeriod: 2 * time.Second, + endpoints: map[string]endpoint.EndpointInfo{}, + socketdir: "/tmp/run", + socketname: "tmp.sock", + resourceNamesMap: map[string]string{ + "domain1.com/resource1": "domain1.com/resource1", + }, + podResources: newPodResourcesChk(), + resourceExecutor: executor.NewExecutor(&cgroupmgr.FakeCgroupManager{}), + checkpointManager: checkpointManager, + podAddChan: make(chan string, 1), + podDeleteChan: make(chan string, 1), + qosConfig: generic.NewQoSConfiguration(), + } + defer func() { _ = os.Remove("/tmp/run/kubelet_qrm_checkpoint") }() + defer func() { _ = os.Remove("/tmp/run/tmp.sock") }() + metaManager := metamanager.NewManager(metrics.DummyMetrics{}, m.podResources.pods, metaServer) + m.metaManager = metaManager + + err = registerEndpointByPods(m, pods) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m.Run(ctx) + time.Sleep(5 * time.Second) + + assert.Equal(t, len(m.podResources.pods()), 2) + assert.Equal(t, len(m.podResources.allAllocatedResourceNames()), 2) +} + +type TestResource struct { + resourceName string + resourceQuantity resource.Quantity +} + +func generateTestMetaServer(conf *config.Configuration, podList []*v1.Pod) (*metaserver.MetaServer, error) { + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{}) + if err != nil { + return nil, err + } + + ms, err := metaserver.NewMetaServer(genericCtx.Client, metrics.DummyMetrics{}, conf) + if err != nil { + return ms, err + } + ms.PodFetcher = &pod.PodFetcherStub{ + PodList: podList, + } + return ms, nil +} + +func generateTestConfiguration(checkpointDir string) *config.Configuration { + conf, _ := options.NewOptions().Config() + + conf.MetaServerConfiguration.CheckpointManagerDir = checkpointDir + + return conf +} + +func makePod(name string, rl v1.ResourceList) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: name, + Resources: v1.ResourceRequirements{ + Requests: rl.DeepCopy(), + Limits: rl.DeepCopy(), + }, + }, + }, + }, + } +} + +func registerEndpointByRes(manager *ManagerImpl, testRes []TestResource) error { + if manager == nil { + return fmt.Errorf("registerEndpointByRes got nil manager") + } + + for i, res := range testRes { + var OciPropertyName string + if res.resourceName == "domain1.com/resource1" { + OciPropertyName = "CpusetCpus" + } else if res.resourceName == "domain2.com/resource2" { + OciPropertyName = "CpusetMems" + } + + curResourceName := res.resourceName + + if res.resourceName == "domain1.com/resource1" || res.resourceName == "domain2.com/resource2" { + manager.registerEndpoint(curResourceName, &pluginapi.ResourcePluginOptions{ + PreStartRequired: true, + WithTopologyAlignment: true, + NeedReconcile: true, + }, &MockEndpoint{ + allocateFunc: func(req *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) { + if req == nil { + return nil, fmt.Errorf("allocateFunc got nil request") + } + + resp := new(pluginapi.ResourceAllocationResponse) + resp.AllocationResult = new(pluginapi.ResourceAllocation) + resp.AllocationResult.ResourceAllocation = make(map[string]*pluginapi.ResourceAllocationInfo) + resp.AllocationResult.ResourceAllocation[curResourceName] = new(pluginapi.ResourceAllocationInfo) + resp.AllocationResult.ResourceAllocation[curResourceName].Envs = make(map[string]string) + resp.AllocationResult.ResourceAllocation[curResourceName].Envs[fmt.Sprintf("key%d", i)] = fmt.Sprintf("val%d", i) + resp.AllocationResult.ResourceAllocation[curResourceName].Annotations = make(map[string]string) + resp.AllocationResult.ResourceAllocation[curResourceName].Annotations[fmt.Sprintf("key%d", i)] = fmt.Sprintf("val%d", i) + resp.AllocationResult.ResourceAllocation[curResourceName].IsScalarResource = true + resp.AllocationResult.ResourceAllocation[curResourceName].IsNodeResource = true + resp.AllocationResult.ResourceAllocation[curResourceName].AllocatedQuantity = req.ResourceRequests[curResourceName] + resp.AllocationResult.ResourceAllocation[curResourceName].AllocationResult = "0-1" + resp.AllocationResult.ResourceAllocation[curResourceName].OciPropertyName = OciPropertyName + return resp, nil + }, + }) + } else if res.resourceName == "domain3.com/resource3" { + manager.registerEndpoint(curResourceName, &pluginapi.ResourcePluginOptions{ + PreStartRequired: true, + WithTopologyAlignment: true, + NeedReconcile: true, + }, &MockEndpoint{ + allocateFunc: func(req *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) { + return nil, fmt.Errorf("mock error") + }, + }) + } + } + + return nil +} + +func registerEndpointByPods(manager *ManagerImpl, pods []*v1.Pod) error { + + for _, resource := range []string{"cpu", "memory"} { + resp := &pluginapi.GetResourcesAllocationResponse{ + PodResources: map[string]*pluginapi.ContainerResources{}, + } + for _, pod := range pods { + uid := string(pod.UID) + if _, ok := resp.PodResources[uid]; !ok { + resp.PodResources[uid] = &pluginapi.ContainerResources{ + ContainerResources: map[string]*pluginapi.ResourceAllocation{}, + } + } + for _, container := range pod.Spec.Containers { + if _, ok := resp.PodResources[uid].ContainerResources[container.Name]; !ok { + resp.PodResources[uid].ContainerResources[container.Name] = &pluginapi.ResourceAllocation{ + ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{}, + } + } + + for resourceName, quantity := range container.Resources.Requests { + if resourceName.String() == resource { + resp.PodResources[uid].ContainerResources[container.Name].ResourceAllocation[string(resourceName)] = &pluginapi.ResourceAllocationInfo{ + IsNodeResource: true, + IsScalarResource: false, + AllocatedQuantity: float64(quantity.Value()), + AllocationResult: "0-1", + } + } + } + } + } + + manager.registerEndpoint(resource, &pluginapi.ResourcePluginOptions{ + NeedReconcile: true, + }, &MockEndpoint{ + resourceAlloc: func(ctx context.Context, request *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) { + return resp, nil + }, + allocateFunc: func(resourceRequest *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) { + return &pluginapi.ResourceAllocationResponse{}, nil + }, + }) + } + + return nil +} + +/* ------------------ mock endpoint for test ----------------------- */ +type MockEndpoint struct { + allocateFunc func(resourceRequest *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) + resourceAlloc func(ctx context.Context, request *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) + stopTime time.Time +} + +func (m *MockEndpoint) Stop() { + m.stopTime = time.Now() +} +func (m *MockEndpoint) run(success chan<- bool) {} + +func (m *MockEndpoint) Allocate(ctx context.Context, resourceRequest *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) { + if m.IsStopped() { + return nil, fmt.Errorf("endpoint %v has been stopped", m) + } + if m.allocateFunc != nil { + return m.allocateFunc(resourceRequest) + } + return nil, nil +} + +func (m *MockEndpoint) IsStopped() bool { + return !m.stopTime.IsZero() +} + +var SGP int = 0 + +func (m *MockEndpoint) StopGracePeriodExpired() bool { + if SGP == 0 { + return false + } else { + return true + } +} + +func (m *MockEndpoint) RemovePod(ctx context.Context, removePodRequest *pluginapi.RemovePodRequest) (*pluginapi.RemovePodResponse, error) { + return nil, nil +} + +func (m *MockEndpoint) GetResourceAllocation(ctx context.Context, request *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) { + if m.resourceAlloc != nil { + return m.resourceAlloc(ctx, request) + } + return nil, nil +} + +func (m *MockEndpoint) GetResourcePluginOptions(ctx context.Context, in *pluginapi.Empty, opts ...grpc.CallOption) (*pluginapi.ResourcePluginOptions, error) { + return &pluginapi.ResourcePluginOptions{ + NeedReconcile: true, + }, nil +} diff --git a/pkg/agent/resourcemanager/outofband/metamanager/manager.go b/pkg/agent/resourcemanager/outofband/metamanager/manager.go new file mode 100644 index 000000000..20b0f205e --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/metamanager/manager.go @@ -0,0 +1,193 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metamanager + +import ( + "context" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +type Manager struct { + ctx context.Context + + emitter metrics.MetricEmitter + + *metaserver.MetaServer + mutex sync.RWMutex + + cachedPods CachedPodListFunc + + podFirstRemoveTime map[string]time.Time + + podAddedFuncs []PodAddedFunc + podDeletedFuncs []PodDeletedFunc +} + +func NewManager( + emitter metrics.MetricEmitter, + cachedPods CachedPodListFunc, + metaServer *metaserver.MetaServer) *Manager { + m := &Manager{ + emitter: emitter, + MetaServer: metaServer, + cachedPods: cachedPods, + podAddedFuncs: make([]PodAddedFunc, 0), + podDeletedFuncs: make([]PodDeletedFunc, 0), + podFirstRemoveTime: make(map[string]time.Time), + } + return m +} + +func (m *Manager) Run(ctx context.Context, reconcilePeriod time.Duration) { + m.ctx = ctx + go wait.Until(m.reconcile, reconcilePeriod, m.ctx.Done()) +} + +func (m *Manager) reconcile() { + + activePods, err := m.MetaServer.GetPodList(m.ctx, native.PodIsActive) + if err != nil { + klog.Errorf("metamanager reconcile GetPodList fail: %v", err) + _ = m.emitter.StoreInt64(MetricReconcileFail, 1, metrics.MetricTypeNameRaw) + return + } + + // reconcile new pods + podsToBeAdded := m.reconcileNewPods(activePods) + if len(podsToBeAdded) > 0 { + m.notifyAddPods(podsToBeAdded) + } + + // reconcile pod terminated and had been deleted + podsTobeRemoved := m.reconcileRemovePods(activePods) + if len(podsTobeRemoved) > 0 { + m.notifyDeletePods(podsTobeRemoved) + } +} + +func (m *Manager) RegistPodAddedFunc(podAddedFunc PodAddedFunc) { + m.podAddedFuncs = append(m.podAddedFuncs, podAddedFunc) +} + +func (m *Manager) RegistPodDeletedFunc(podDeletedFunc PodDeletedFunc) { + m.podDeletedFuncs = append(m.podDeletedFuncs, podDeletedFunc) +} + +// reconcileNewPods checks new pods between activePods from metaServer and pods in manager cache +func (m *Manager) reconcileNewPods(activePods []*v1.Pod) []string { + podsToBeAdded := make([]string, 0) + podList := m.cachedPods() + + for _, pod := range activePods { + if !podList.Has(string(pod.UID)) { + podsToBeAdded = append(podsToBeAdded, string(pod.UID)) + } + } + + return podsToBeAdded +} + +// reconcileRemovePods checks deleted pods between activePods from metaServer and pods in manager cache +func (m *Manager) reconcileRemovePods(activePods []*v1.Pod) map[string]struct{} { + podsToBeRemoved := make(map[string]struct{}) + podList := m.cachedPods() + + for _, pod := range activePods { + if podList.Has(string(pod.UID)) { + podList = podList.Delete(string(pod.UID)) + } + } + + // gc pod remove timestamp + m.mutex.Lock() + for _, pod := range activePods { + delete(m.podFirstRemoveTime, string(pod.UID)) + } + m.mutex.Unlock() + + // check pod can be removed + for _, podUID := range podList.UnsortedList() { + if m.canPodDelete(podUID) { + podsToBeRemoved[podUID] = struct{}{} + } + } + + return podsToBeRemoved +} + +func (m *Manager) notifyAddPods(podUIDs []string) { + if len(m.podAddedFuncs) > 0 { + klog.V(5).Infof("metaManager notifyAddPods: %v", podUIDs) + + for _, podUID := range podUIDs { + for _, addFunc := range m.podAddedFuncs { + addFunc(podUID) + } + } + } +} + +func (m *Manager) notifyDeletePods(podUIDSet map[string]struct{}) { + if len(m.podDeletedFuncs) > 0 { + klog.V(5).Infof("metaManager notifyDeletePods: %v", podUIDSet) + + for podUID := range podUIDSet { + for _, deleteFuncs := range m.podDeletedFuncs { + deleteFuncs(podUID) + } + } + } +} + +func (m *Manager) canPodDelete(podUID string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + // generate pod cgroup path, use cpu as subsystem + _, err := common.GetPodAbsCgroupPath(common.CgroupSubsysCPU, podUID) + if err != nil { + // GetPodAbsCgroupPath return error only if pod cgroup path not exist + klog.Warning(err.Error()) + delete(m.podFirstRemoveTime, podUID) + return true + } + + // pod is not exist in metaServer, deletionTimestamp can not be got by pod + // first deletion check time should be record + firstRemoveTime, ok := m.podFirstRemoveTime[podUID] + if !ok { + m.podFirstRemoveTime[podUID] = time.Now() + } else { + if time.Now().After(firstRemoveTime.Add(forceRemoveDuration)) { + delete(m.podFirstRemoveTime, podUID) + return true + } + + return false + } + + return false +} diff --git a/pkg/agent/resourcemanager/outofband/metamanager/manager_test.go b/pkg/agent/resourcemanager/outofband/metamanager/manager_test.go new file mode 100644 index 000000000..25ac233c8 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/metamanager/manager_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metamanager + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +func generateTestMetaServer(conf *config.Configuration) (*metaserver.MetaServer, error) { + genericCtx, err := katalyst_base.GenerateFakeGenericContext([]runtime.Object{}) + if err != nil { + return nil, err + } + + return metaserver.NewMetaServer(genericCtx.Client, metrics.DummyMetrics{}, conf) +} + +func generateTestConfiguration(checkpointDir string) *config.Configuration { + conf, _ := options.NewOptions().Config() + + conf.MetaServerConfiguration.CheckpointManagerDir = checkpointDir + + return conf +} + +func TestReconcile(t *testing.T) { + t.Parallel() + + ckDir, err := ioutil.TempDir("", "checkpoint-Test") + require.NoError(t, err) + defer func() { _ = os.RemoveAll(ckDir) }() + + conf := generateTestConfiguration(ckDir) + metaServer, err := generateTestMetaServer(conf) + require.NoError(t, err) + + metaServer.PodFetcher = &pod.PodFetcherStub{ + PodList: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod0", + UID: "pod0", + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1", + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2", + }, + }, + }, + } + + manager := NewManager(metrics.DummyMetrics{}, func() sets.String { + return sets.NewString("pod0", "pod3", "pod4", "pod5") + }, metaServer) + + newPodList := make([]string, 0) + removePodList := make([]string, 0) + + manager.RegistPodAddedFunc(func(podUID string) { + newPodList = append(newPodList, podUID) + }) + manager.RegistPodDeletedFunc(func(podUID string) { + removePodList = append(removePodList, podUID) + }) + + manager.reconcile() + require.Equal(t, 2, len(newPodList)) + require.Equal(t, 3, len(removePodList)) +} diff --git a/pkg/agent/resourcemanager/outofband/metamanager/types.go b/pkg/agent/resourcemanager/outofband/metamanager/types.go new file mode 100644 index 000000000..83eb5b296 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/metamanager/types.go @@ -0,0 +1,35 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metamanager + +import ( + "time" + + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + MetricReconcileFail = "metamanager_reconcile_fail" + + forceRemoveDuration = 5 * time.Minute +) + +type PodAddedFunc func(podUID string) + +type PodDeletedFunc func(podUID string) + +type CachedPodListFunc func() sets.String diff --git a/pkg/agent/resourcemanager/outofband/pluginhandler.go b/pkg/agent/resourcemanager/outofband/pluginhandler.go new file mode 100644 index 000000000..052304ff8 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/pluginhandler.go @@ -0,0 +1,177 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package outofband + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + + endpoint2 "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/endpoint" +) + +func (m *ManagerImpl) removeContents(dir string) error { + d, err := os.Open(dir) + if err != nil { + return err + } + defer d.Close() + names, err := d.Readdirnames(-1) + if err != nil { + return err + } + var errs []error + for _, name := range names { + filePath := filepath.Join(dir, name) + if filePath == m.checkpointFile() { + continue + } + stat, err := os.Stat(filePath) + if err != nil { + klog.Errorf("[ORM] Failed to stat file %s: %v", filePath, err) + continue + } + if stat.IsDir() { + continue + } + err = os.RemoveAll(filePath) + if err != nil { + errs = append(errs, err) + klog.Errorf("[ORM] Failed to remove file %s: %v", filePath, err) + continue + } + } + return errors.NewAggregate(errs) +} + +// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource +func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error { + klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions) + + if !m.isVersionCompatibleWithPlugin(versions) { + return fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions) + } + + return nil +} + +// RegisterPlugin starts the endpoint and registers it +func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error { + klog.V(2).Infof("[ORM] Registering Plugin %s at endpoint %s", pluginName, endpoint) + + e, err := endpoint2.NewEndpointImpl(endpoint, pluginName) + if err != nil { + return fmt.Errorf("[ORM] failed to dial resource plugin with socketPath %s: %v", endpoint, err) + } + + options, err := e.GetResourcePluginOptions(context.Background(), &pluginapi.Empty{}) + if err != nil { + return fmt.Errorf("[ORM] failed to get resource plugin options: %v", err) + } + + m.registerEndpoint(pluginName, options, e) + + return nil +} + +// DeRegisterPlugin deregisters the plugin +func (m *ManagerImpl) DeRegisterPlugin(pluginName string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if eI, ok := m.endpoints[pluginName]; ok { + eI.E.Stop() + } +} + +func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.ResourcePluginOptions, e endpoint2.Endpoint) { + m.mutex.Lock() + defer m.mutex.Unlock() + + old, ok := m.endpoints[resourceName] + + if ok && !old.E.IsStopped() { + klog.V(2).Infof("[ORM] stop old endpoint: %v", old.E) + old.E.Stop() + } + + m.endpoints[resourceName] = endpoint2.EndpointInfo{E: e, Opts: options} + klog.V(2).Infof("[ORM] Registered endpoint %v", e) +} + +func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool { + for _, version := range versions { + for _, supportedVersion := range pluginapi.SupportedVersions { + if version == supportedVersion { + return true + } + } + } + return false +} + +// Register registers a resource plugin. +func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) { + klog.Infof("[ORM] Got registration request from resource plugin with resource name %q", r.ResourceName) + var versionCompatible bool + for _, v := range pluginapi.SupportedVersions { + if r.Version == v { + versionCompatible = true + break + } + } + if !versionCompatible { + errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions) + klog.Infof("Bad registration request from resource plugin with resource name %q: %s", r.ResourceName, errorString) + return &pluginapi.Empty{}, fmt.Errorf(errorString) + } + + // TODO: for now, always accepts newest resource plugin. Later may consider to + // add some policies here, e.g., verify whether an old resource plugin with the + // same resource name is still alive to determine whether we want to accept + // the new registration. + success := make(chan bool) + go m.addEndpoint(r, success) + select { + case pass := <-success: + if pass { + klog.Infof("[ORM] Register resource plugin for %s success", r.ResourceName) + return &pluginapi.Empty{}, nil + } + klog.Errorf("[ORM] Register resource plugin for %s fail", r.ResourceName) + return &pluginapi.Empty{}, fmt.Errorf("failed to register resource %s", r.ResourceName) + case <-ctx.Done(): + klog.Errorf("[ORM] Register resource plugin for %s timeout", r.ResourceName) + return &pluginapi.Empty{}, fmt.Errorf("timeout to register resource %s", r.ResourceName) + } +} + +func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest, success chan<- bool) { + new, err := endpoint2.NewEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName) + if err != nil { + klog.Errorf("[qosresourcemanager] Failed to dial resource plugin with request %v: %v", r, err) + success <- false + return + } + m.registerEndpoint(r.ResourceName, r.Options, new) + success <- true +} diff --git a/pkg/agent/resourcemanager/outofband/pod_resource.go b/pkg/agent/resourcemanager/outofband/pod_resource.go new file mode 100644 index 000000000..86a4280c8 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/pod_resource.go @@ -0,0 +1,227 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package outofband + +import ( + "reflect" + "sync" + + //nolint + "github.com/golang/protobuf/proto" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + + "github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/outofband/checkpoint" +) + +type ResourceAllocation map[string]*pluginapi.ResourceAllocationInfo // Keyed by resourceName. +type ContainerResources map[string]ResourceAllocation // Keyed by containerName. +type PodResources map[string]ContainerResources // Keyed by podUID + +type podResourcesChk struct { + sync.RWMutex + resources PodResources // Keyed by podUID. +} + +var EmptyValue = reflect.Value{} + +func newPodResourcesChk() *podResourcesChk { + return &podResourcesChk{ + resources: make(PodResources), + } +} + +func (pr PodResources) DeepCopy() PodResources { + copiedPodResources := make(PodResources) + + for podUID, containerResources := range pr { + copiedPodResources[podUID] = containerResources.DeepCopy() + } + + return copiedPodResources +} + +func (cr ContainerResources) DeepCopy() ContainerResources { + copiedContainerResources := make(ContainerResources) + + for containerName, resouceAllocation := range cr { + copiedContainerResources[containerName] = resouceAllocation.DeepCopy() + } + + return copiedContainerResources +} + +func (ra ResourceAllocation) DeepCopy() ResourceAllocation { + copiedResourceAllocation := make(ResourceAllocation) + + for resourceName, allocationInfo := range ra { + copiedResourceAllocation[resourceName] = proto.Clone(allocationInfo).(*pluginapi.ResourceAllocationInfo) + } + + return copiedResourceAllocation +} + +func (pres *podResourcesChk) pods() sets.String { + pres.RLock() + defer pres.RUnlock() + + ret := sets.NewString() + for k := range pres.resources { + ret.Insert(k) + } + return ret +} + +// "resourceName" here is different than "resourceName" in qrm allocation, one qrm plugin may +// only represent one resource in allocation, but can also return several other resourceNames +// to store in pod resources +func (pres *podResourcesChk) insert(podUID, contName, resourceName string, allocationInfo *pluginapi.ResourceAllocationInfo) { + if allocationInfo == nil { + return + } + + pres.Lock() + defer pres.Unlock() + + if _, podExists := pres.resources[podUID]; !podExists { + pres.resources[podUID] = make(ContainerResources) + } + if _, contExists := pres.resources[podUID][contName]; !contExists { + pres.resources[podUID][contName] = make(ResourceAllocation) + } + + pres.resources[podUID][contName][resourceName] = proto.Clone(allocationInfo).(*pluginapi.ResourceAllocationInfo) +} + +func (pres *podResourcesChk) deleteResourceAllocationInfo(podUID, contName, resourceName string) { + pres.Lock() + defer pres.Unlock() + + if pres.resources[podUID] != nil && pres.resources[podUID][contName] != nil { + delete(pres.resources[podUID][contName], resourceName) + } +} + +func (pres *podResourcesChk) deletePod(podUID string) { + pres.Lock() + defer pres.Unlock() + + if pres.resources == nil { + return + } + + delete(pres.resources, podUID) +} + +func (pres *podResourcesChk) delete(pods []string) { + pres.Lock() + defer pres.Unlock() + + if pres.resources == nil { + return + } + + for _, uid := range pods { + delete(pres.resources, uid) + } +} + +func (pres *podResourcesChk) podResources(podUID string) ContainerResources { + pres.RLock() + defer pres.RUnlock() + + if _, podExists := pres.resources[podUID]; !podExists { + return nil + } + + return pres.resources[podUID] +} + +// Returns all resources information allocated to the given container. +// Returns nil if we don't have cached state for the given . +func (pres *podResourcesChk) containerAllResources(podUID, contName string) ResourceAllocation { + pres.RLock() + defer pres.RUnlock() + + if _, podExists := pres.resources[podUID]; !podExists { + return nil + } + if _, contExists := pres.resources[podUID][contName]; !contExists { + return nil + } + + return pres.resources[podUID][contName].DeepCopy() +} + +// Turns podResourcesChk to checkpointData. +func (pres *podResourcesChk) toCheckpointData() []checkpoint.PodResourcesEntry { + pres.RLock() + defer pres.RUnlock() + + var data []checkpoint.PodResourcesEntry + for podUID, containerResources := range pres.resources { + for conName, resourcesAllocation := range containerResources { + for resourceName, allocationInfo := range resourcesAllocation { + allocRespBytes, err := allocationInfo.Marshal() + if err != nil { + klog.Errorf("Can't marshal allocationInfo for %v %v %v: %v", podUID, conName, resourceName, err) + continue + } + data = append(data, checkpoint.PodResourcesEntry{ + PodUID: podUID, + ContainerName: conName, + ResourceName: resourceName, + AllocationInfo: string(allocRespBytes)}) + } + } + } + return data +} + +// Populates podResourcesChk from the passed in checkpointData. +func (pres *podResourcesChk) fromCheckpointData(data []checkpoint.PodResourcesEntry) { + for _, entry := range data { + klog.V(2).Infof("Get checkpoint entry: %s %s %s %s\n", + entry.PodUID, entry.ContainerName, entry.ResourceName, entry.AllocationInfo) + allocationInfo := &pluginapi.ResourceAllocationInfo{} + err := allocationInfo.Unmarshal([]byte(entry.AllocationInfo)) + if err != nil { + klog.Errorf("Can't unmarshal allocationInfo for %s %s %s %s: %v", + entry.PodUID, entry.ContainerName, entry.ResourceName, entry.AllocationInfo, err) + continue + } + pres.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, allocationInfo) + } +} + +func (pres *podResourcesChk) allAllocatedResourceNames() sets.String { + pres.RLock() + defer pres.RUnlock() + + res := sets.NewString() + + for _, containerResources := range pres.resources { + for _, resourcesAllocation := range containerResources { + for resourceName := range resourcesAllocation { + res.Insert(resourceName) + } + } + } + + return res +} diff --git a/pkg/agent/resourcemanager/outofband/pod_resource_test.go b/pkg/agent/resourcemanager/outofband/pod_resource_test.go new file mode 100644 index 000000000..10a74175f --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/pod_resource_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package outofband + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" +) + +func TestPodResources(t *testing.T) { + t.Parallel() + + podResource := newPodResourcesChk() + + resourceAllocationInfo := generateResourceAllocationInfo() + + podResource.insert("testPod", "testContainer", "cpu", resourceAllocationInfo) + + containerResources := podResource.podResources("testPod") + assert.NotNil(t, containerResources) + assert.Equal(t, len(containerResources), 1) + containerResources = podResource.podResources("nonPod") + assert.Nil(t, containerResources) + + containerAllResources := podResource.containerAllResources("testPod", "testContainer") + assert.NotNil(t, containerAllResources) + assert.Equal(t, len(containerAllResources), 1) + containerAllResources = podResource.containerAllResources("nonPod", "testContainer") + assert.Nil(t, containerAllResources) + containerAllResources = podResource.containerAllResources("testPod", "nonContainer") + assert.Nil(t, containerAllResources) + + podSet := podResource.pods() + assert.Equal(t, podSet, sets.NewString("testPod")) + resourceSet := podResource.allAllocatedResourceNames() + assert.Equal(t, resourceSet, sets.NewString("cpu")) + + podResource.insert("testPod", "testContainer", "memory", resourceAllocationInfo) + podResource.insert("testPod2", "testContainer2", "cpu", resourceAllocationInfo) + entries := podResource.toCheckpointData() + assert.Equal(t, len(entries), 3) + + podResource.deletePod("testPod") + podResource.deletePod("testPod2") + containerResources = podResource.podResources("testPod") + assert.Nil(t, containerResources) + + podResource.insert("testPod", "testContainer", "cpu", resourceAllocationInfo) + podResource.delete([]string{"testPod"}) + containerResources = podResource.podResources("testPod") + assert.Nil(t, containerResources) + + podResource.insert("testPod", "testContainer", "cpu", resourceAllocationInfo) + podResource.deleteResourceAllocationInfo("testPod", "testContainer", "cpu") + containerAllResources = podResource.containerAllResources("testPod", "testContainer") + assert.NotNil(t, containerAllResources) + assert.Equal(t, len(containerAllResources), 0) +} + +func generateResourceAllocationInfo() *pluginapi.ResourceAllocationInfo { + return &pluginapi.ResourceAllocationInfo{ + OciPropertyName: "CpusetCpus", + IsNodeResource: true, + IsScalarResource: true, + AllocatedQuantity: 3, + AllocationResult: "5-6,10", + Envs: map[string]string{"mock_key": "mock_env"}, + Annotations: map[string]string{"mock_key": "mock_ano"}, + ResourceHints: &pluginapi.ListOfTopologyHints{}, + } +} diff --git a/pkg/agent/resourcemanager/outofband/types.go b/pkg/agent/resourcemanager/outofband/types.go new file mode 100644 index 000000000..e3d6fe041 --- /dev/null +++ b/pkg/agent/resourcemanager/outofband/types.go @@ -0,0 +1,37 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package outofband + +const ( + MetricAddPodTimeout = "ORM_add_pod_timeout" + MetricDeletePodTImeout = "ORM_delete_pod_timeout" + + MainContainerNameAnnotationKey = "kubernetes.io/main-container-name" + + KubeletPluginsDirSELinuxLabel = "system_u:object_r:container_file_t:s0" +) + +const ( + errBadSocket = "bad socketPath, must be an absolute path:" + + // errUnsupportedVersion is the error raised when the resource plugin uses an API version not + // supported by the ORM registry + errUnsupportedVersion = "requested API version %q is not supported by ORM. Supported version is %q" + + // errListenSocket is the error raised when the registry could not listen on the socket + errListenSocket = "failed to listen to socket while starting resource plugin registry, with error" +) diff --git a/pkg/config/agent/agent_base.go b/pkg/config/agent/agent_base.go index fadb8caa4..3e35fd1c1 100644 --- a/pkg/config/agent/agent_base.go +++ b/pkg/config/agent/agent_base.go @@ -20,6 +20,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" "github.com/kubewharf/katalyst-core/pkg/config/agent/eviction" "github.com/kubewharf/katalyst-core/pkg/config/agent/global" + "github.com/kubewharf/katalyst-core/pkg/config/agent/orm" "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" "github.com/kubewharf/katalyst-core/pkg/config/agent/reporter" "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor" @@ -54,6 +55,7 @@ type GenericAgentConfiguration struct { *reporter.GenericReporterConfiguration *sysadvisor.GenericSysAdvisorConfiguration *qrm.GenericQRMPluginConfiguration + *orm.GenericORMConfiguration } type StaticAgentConfiguration struct { @@ -73,6 +75,7 @@ func NewGenericAgentConfiguration() *GenericAgentConfiguration { GenericReporterConfiguration: reporter.NewGenericReporterConfiguration(), GenericSysAdvisorConfiguration: sysadvisor.NewGenericSysAdvisorConfiguration(), GenericQRMPluginConfiguration: qrm.NewGenericQRMPluginConfiguration(), + GenericORMConfiguration: orm.NewGenericORMConfiguration(), } } diff --git a/pkg/config/agent/orm/orm_base.go b/pkg/config/agent/orm/orm_base.go new file mode 100644 index 000000000..301c1f4d9 --- /dev/null +++ b/pkg/config/agent/orm/orm_base.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orm + +import "time" + +type GenericORMConfiguration struct { + ORMRconcilePeriod time.Duration + ORMResourceNamesMap map[string]string + ORMPodNotifyChanLen int +} + +func NewGenericORMConfiguration() *GenericORMConfiguration { + return &GenericORMConfiguration{ + ORMRconcilePeriod: time.Second * 5, + ORMResourceNamesMap: map[string]string{}, + ORMPodNotifyChanLen: 10, + } +} diff --git a/pkg/util/cgroup/common/path.go b/pkg/util/cgroup/common/path.go index 030039260..1989a7aac 100644 --- a/pkg/util/cgroup/common/path.go +++ b/pkg/util/cgroup/common/path.go @@ -133,7 +133,7 @@ func GetKubernetesAnyExistRelativeCgroupPath(suffix string) (string, error) { // GetPodAbsCgroupPath returns absolute cgroup path for pod level func GetPodAbsCgroupPath(subsys, podUID string) (string, error) { - return GetKubernetesAnyExistAbsCgroupPath(subsys, podUID) + return GetKubernetesAnyExistAbsCgroupPath(subsys, fmt.Sprintf("%s%s", PodCgroupPathPrefix, podUID)) } // GetContainerAbsCgroupPath returns absolute cgroup path for container level diff --git a/pkg/util/cgroup/manager/fake_manager.go b/pkg/util/cgroup/manager/fake_manager.go new file mode 100644 index 000000000..c67bd0c89 --- /dev/null +++ b/pkg/util/cgroup/manager/fake_manager.go @@ -0,0 +1,93 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + +type FakeCgroupManager struct{} + +func (f *FakeCgroupManager) ApplyMemory(absCgroupPath string, data *common.MemoryData) error { + return nil +} + +func (f *FakeCgroupManager) ApplyCPU(absCgroupPath string, data *common.CPUData) error { + return nil +} + +func (f *FakeCgroupManager) ApplyCPUSet(absCgroupPath string, data *common.CPUSetData) error { + return nil +} + +func (f *FakeCgroupManager) ApplyNetCls(absCgroupPath string, data *common.NetClsData) error { + return nil +} + +func (f *FakeCgroupManager) ApplyIOCostQoS(absCgroupPath string, devID string, data *common.IOCostQoSData) error { + return nil +} + +func (f *FakeCgroupManager) ApplyIOCostModel(absCgroupPath string, devID string, data *common.IOCostModelData) error { + return nil +} + +func (f *FakeCgroupManager) ApplyIOWeight(absCgroupPath string, devID string, weight uint64) error { + return nil +} + +func (f *FakeCgroupManager) ApplyUnifiedData(absCgroupPath, cgroupFileName, data string) error { + return nil +} + +func (f *FakeCgroupManager) GetMemory(absCgroupPath string) (*common.MemoryStats, error) { + return nil, nil +} + +func (f *FakeCgroupManager) GetCPU(absCgroupPath string) (*common.CPUStats, error) { + return nil, nil +} + +func (f *FakeCgroupManager) GetCPUSet(absCgroupPath string) (*common.CPUSetStats, error) { + return nil, nil +} + +func (f *FakeCgroupManager) GetIOCostQoS(absCgroupPath string) (map[string]*common.IOCostQoSData, error) { + return nil, nil +} + +func (f *FakeCgroupManager) GetIOCostModel(absCgroupPath string) (map[string]*common.IOCostModelData, error) { + return nil, nil +} + +func (f *FakeCgroupManager) GetDeviceIOWeight(absCgroupPath string, devID string) (uint64, bool, error) { + return 0, false, nil +} + +func (f *FakeCgroupManager) GetIOStat(absCgroupPath string) (map[string]map[string]string, error) { + return nil, nil +} + +func (f *FakeCgroupManager) GetMetrics(relCgroupPath string, subsystems map[string]struct{}) (*common.CgroupMetrics, error) { + return nil, nil +} + +func (f *FakeCgroupManager) GetPids(absCgroupPath string) ([]string, error) { + return nil, nil +} + +func (f *FakeCgroupManager) GetTasks(absCgroupPath string) ([]string, error) { + return nil, nil +}