Skip to content

Commit

Permalink
koordlet: add hooked podresources
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <[email protected]>
  • Loading branch information
wangjianyu.wjy committed Feb 11, 2025
1 parent 96ff411 commit b447b28
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 11 deletions.
1 change: 1 addition & 0 deletions .licenseignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pkg/scheduler/frameworkext/topologymanager/policy_restricted_test.go
pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go
pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node_test.go
pkg/koordlet/util/kubelet
pkg/koordlet/statesinformer/impl
pkg/util/cpuset/cpuset.go
pkg/util/cpuset/cpuset_test.go
pkg/util/bitmask/bitmask.go
Expand Down
7 changes: 7 additions & 0 deletions config/manager/koordlet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ spec:
mountPropagation: HostToContainer
- mountPath: /metric-data/
name: metric-db-path
- mountPath: /var/lib/koordlet/pod-resources/
mountPropagation: Bidirectional
name: hooked-pod-resources
hostNetwork: true
hostPID: true
restartPolicy: Always
Expand Down Expand Up @@ -148,3 +151,7 @@ spec:
- emptyDir:
sizeLimit: 150Mi
name: metric-db-path
- hostPath:
path: /var/lib/koordlet/pod-resources/
type: DirectoryOrCreate
name: hooked-pod-resources
8 changes: 8 additions & 0 deletions pkg/features/koordlet_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ const (
// Backend applications can enable the hugepages based on the allocation results.
// For example, the CSI mounts the pre-allocated hugepages into the pod.
HugePageReport featuregate.Feature = "HugePageReport"

// owner: @ZiMengSheng
// alpha: v1.6
//
// HookedPodResources enabled hooked podResources of kubelet provided by koordlet.
// It provides a grpc service to enable discovery of pod resources allocated by koordinator system.
HookedPodResources featuregate.Feature = "HookedPodResources"
)

func init() {
Expand Down Expand Up @@ -177,6 +184,7 @@ var (
BlkIOReconcile: {Default: false, PreRelease: featuregate.Alpha},
ColdPageCollector: {Default: false, PreRelease: featuregate.Alpha},
HugePageReport: {Default: false, PreRelease: featuregate.Alpha},
HookedPodResources: {Default: false, PreRelease: featuregate.Alpha},
}
)

Expand Down
3 changes: 3 additions & 0 deletions pkg/koordlet/statesinformer/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
EnableNodeMetricReport bool
MetricReportInterval time.Duration // Deprecated
EnablePodTaskIds bool
HookedPodResourceSocketPath string
}

func NewDefaultConfig() *Config {
Expand All @@ -47,6 +48,7 @@ func NewDefaultConfig() *Config {
DisableQueryKubeletConfig: false,
EnableNodeMetricReport: true,
EnablePodTaskIds: false,
HookedPodResourceSocketPath: "/var/lib/koordlet/pod-resources",
}
}

Expand All @@ -61,4 +63,5 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.DurationVar(&c.MetricReportInterval, "report-interval", c.MetricReportInterval, "Deprecated since v1.1, use ColocationStrategy.MetricReportIntervalSeconds in config map of slo-controller")
fs.BoolVar(&c.EnableNodeMetricReport, "enable-node-metric-report", c.EnableNodeMetricReport, "Enable status update of node metric crd.")
fs.BoolVar(&c.EnablePodTaskIds, "enable-pod-taskids", c.EnablePodTaskIds, "Enable pod taskids in statesinformer.")
fs.StringVar(&c.HookedPodResourceSocketPath, "hooked-pod-resource-socket-path", c.HookedPodResourceSocketPath, "The path of the socket file for the pod resource hook.")
}
2 changes: 2 additions & 0 deletions pkg/koordlet/statesinformer/impl/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestNewDefaultConfig(t *testing.T) {
EnableNodeMetricReport: true,
MetricReportInterval: 0,
EnablePodTaskIds: false,
HookedPodResourceSocketPath: "/var/lib/koordlet/pod-resources",
},
},
}
Expand Down Expand Up @@ -116,6 +117,7 @@ func TestConfig_InitFlags(t *testing.T) {
DisableQueryKubeletConfig: tt.fields.DisableQueryKubeletConfig,
EnableNodeMetricReport: tt.fields.EnableNodeMetricReport,
EnablePodTaskIds: tt.fields.EnablePodTaskIds,
HookedPodResourceSocketPath: "/var/lib/koordlet/pod-resources",
}
c := NewDefaultConfig()
c.InitFlags(tt.args.fs)
Expand Down
98 changes: 98 additions & 0 deletions pkg/koordlet/statesinformer/impl/kubelet_resource_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright (c) 2019 Intel Corporation
Copyright (c) 2021 Multus Authors
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package impl

import (
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"

podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
)

const (
defaultKubeletSocket = "kubelet" // which is defined in k8s.io/kubernetes/pkg/kubelet/apis/podresources
kubeletConnectionTimeout = 10 * time.Second
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
defaultPodResourcesPath = "/var/lib/kubelet/pod-resources"
unixProtocol = "unix"
)

// LocalEndpoint returns the full path to a unix socket at the given endpoint
// which is in k8s.io/kubernetes/pkg/kubelet/util
func localEndpoint(path string) *url.URL {
return &url.URL{
Scheme: unixProtocol,
Path: path + ".sock",
}
}

// GetResourceClient returns an instance of ResourceClient interface initialized with Pod resource information
func GetResourceClient(kubeletSocket string) (podresourcesapi.PodResourcesListerClient, error) {
kubeletSocketURL := localEndpoint(filepath.Join(defaultPodResourcesPath, defaultKubeletSocket))

if kubeletSocket != "" {
kubeletSocketURL = &url.URL{
Scheme: unixProtocol,
Path: kubeletSocket,
}
}
// If Kubelet resource API endpoint exist use that by default
// Or else fallback with checkpoint file
if hasKubeletAPIEndpoint(kubeletSocketURL) {
klog.V(5).Info("GetResourceClient: using Kubelet resource API endpoint")
kubeletResourceClient, _, err := getKubeletResourceClient(kubeletSocketURL, kubeletConnectionTimeout)
return kubeletResourceClient, err
}
return nil, fmt.Errorf("GetResourceClient: no resource client found")
}

func hasKubeletAPIEndpoint(url *url.URL) bool {
// Check for kubelet resource API socket file
if _, err := os.Stat(url.Path); err != nil {
klog.Errorf("hasKubeletAPIEndpoint: error looking up kubelet resource api socket file: %q", err)
return false
}
return true
}

func getKubeletResourceClient(kubeletSocketURL *url.URL, timeout time.Duration) (podresourcesapi.PodResourcesListerClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

conn, err := grpc.DialContext(ctx, kubeletSocketURL.Path, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dial),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultPodResourcesMaxSize)))
if err != nil {
return nil, nil, fmt.Errorf("error dialing socket %s: %v", kubeletSocketURL.Path, err)
}
return podresourcesapi.NewPodResourcesListerClient(conn), conn, nil
}

func dial(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr)
}
13 changes: 7 additions & 6 deletions pkg/koordlet/statesinformer/impl/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package impl
// NOTE: variables in this file can be overwritten for extension

var DefaultPluginRegistry = map[PluginName]informerPlugin{
nodeSLOInformerName: NewNodeSLOInformer(),
pvcInformerName: NewPVCInformer(),
nodeTopoInformerName: NewNodeTopoInformer(),
nodeInformerName: NewNodeInformer(),
podsInformerName: NewPodsInformer(),
nodeMetricInformerName: NewNodeMetricInformer(),
nodeSLOInformerName: NewNodeSLOInformer(),
pvcInformerName: NewPVCInformer(),
nodeTopoInformerName: NewNodeTopoInformer(),
nodeInformerName: NewNodeInformer(),
podsInformerName: NewPodsInformer(),
podResourcesInformerName: newPodResourcesInformer(),
nodeMetricInformerName: NewNodeMetricInformer(),
}
189 changes: 189 additions & 0 deletions pkg/koordlet/statesinformer/impl/states_pod_resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package impl

import (
"context"
"net"
"os"
"path/filepath"

"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"

"github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/features"
)

const (
podResourcesInformerName PluginName = "podResourcesInformer"
)

var (
_ podresourcesapi.PodResourcesListerServer = &podResourcesServer{}
)

type podResourcesInformer struct {
config *Config
nodeInformer *nodeInformer
resourceServer podresourcesapi.PodResourcesListerServer
}

func newPodResourcesInformer() *podResourcesInformer {
return &podResourcesInformer{}
}

func (s *podResourcesInformer) Setup(ctx *PluginOption, states *PluginState) {
s.config = ctx.config

nodeInformerIf := states.informerPlugins[nodeInformerName]
nodeInformer, ok := nodeInformerIf.(*nodeInformer)
if !ok {
klog.Fatalf("node informer format error")
}
s.nodeInformer = nodeInformer
}

func (s *podResourcesInformer) Start(stopCh <-chan struct{}) {
if !features.DefaultKoordletFeatureGate.Enabled(features.HookedPodResources) {
return
}
klog.V(2).Infof("starting pod resources informer")
if !cache.WaitForCacheSync(stopCh, s.nodeInformer.HasSynced) {
klog.Fatalf("timed out waiting for node caches to sync")
}
stub, err := newKubeletStubFromConfig(s.nodeInformer.GetNode(), s.config)
if err != nil {
klog.Fatalf("create kubelet stub, %v", err)
}
resourceClient, err := GetResourceClient("")
if err != nil {
klog.Fatalf("create resource client, %v", err)
}
s.resourceServer = &podResourcesServer{
kubeletStub: stub,
podResourceClient: resourceClient,
}
err = s.startServer()
if err != nil {
klog.Fatalf("start grpc server, %v", err)
}
}

func (s *podResourcesInformer) startServer() error {
grpcServerFullSocketPath := filepath.Join(s.config.HookedPodResourceSocketPath, defaultKubeletSocket+".sock")
err := cleanup(grpcServerFullSocketPath)
if err != nil {
klog.Errorf("failed to cleanup %s: %s", grpcServerFullSocketPath, err.Error())
return err
}
sock, err := net.Listen("unix", grpcServerFullSocketPath)
if err != nil {
klog.Errorf("failed to listen: %s", err.Error())
return err
}
server := grpc.NewServer()
podresourcesapi.RegisterPodResourcesListerServer(server, s.resourceServer)
klog.Infof("Starting GRPC server, grpcServerSocketFullPath: %s", grpcServerFullSocketPath)
go func() {
err := server.Serve(sock)
if err != nil {
server.Stop()
klog.Fatalf("plugin exited with error %s", err.Error())
}
}()
return nil
}

func cleanup(grpcServerSocketFullPath string) error {
if err := os.Remove(grpcServerSocketFullPath); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}

func (s *podResourcesInformer) HasSynced() bool {
return true
}

type podResourcesServer struct {
podResourceClient podresourcesapi.PodResourcesListerClient
kubeletStub KubeletStub
}

func (p *podResourcesServer) List(ctx context.Context, request *podresourcesapi.ListPodResourcesRequest) (*podresourcesapi.ListPodResourcesResponse, error) {
response, err := p.podResourceClient.List(ctx, request)
if err != nil {
return nil, err
}
allPods, err := p.kubeletStub.GetAllPods()
if err != nil {
return nil, err
}
fillPodDevicesAllocatedByKoord(response, &allPods)
return response, nil
}

func fillPodDevicesAllocatedByKoord(response *podresourcesapi.ListPodResourcesResponse, allPods *corev1.PodList) {
deviceTypeToResourceName := map[schedulingv1alpha1.DeviceType]string{
schedulingv1alpha1.GPU: string(extension.ResourceNvidiaGPU),
schedulingv1alpha1.RDMA: string(extension.ResourceRDMA),
}

for _, podResource := range response.PodResources {
for _, pod := range allPods.Items {
if pod.Namespace == podResource.Namespace && pod.Name == podResource.Name {
deviceAllocations, err := extension.GetDeviceAllocations(pod.Annotations)
if err != nil || deviceAllocations == nil {
continue
}

for deviceType, deviceAllocation := range deviceAllocations {
var deviceIDs []string
for _, device := range deviceAllocation {
deviceIDs = append(deviceIDs, device.ID)
}
podResource.Containers[0].Devices = append(podResource.Containers[0].Devices, &podresourcesapi.ContainerDevices{
ResourceName: deviceTypeToResourceName[deviceType],
DeviceIds: deviceIDs,
})
}
break
}
}
}
}

func (p *podResourcesServer) GetAllocatableResources(ctx context.Context, request *podresourcesapi.AllocatableResourcesRequest) (*podresourcesapi.AllocatableResourcesResponse, error) {
response, err := p.podResourceClient.GetAllocatableResources(ctx, request)
if err != nil {
return nil, err
}
return response, nil
}

func (p *podResourcesServer) Get(ctx context.Context, request *podresourcesapi.GetPodResourcesRequest) (*podresourcesapi.GetPodResourcesResponse, error) {
response, err := p.podResourceClient.Get(ctx, request)
if err != nil {
return nil, err
}
return response, err
}
Loading

0 comments on commit b447b28

Please sign in to comment.