From 7efd397d4a085d70622d492d91b2d3a586bf09ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Thu, 21 Nov 2024 17:37:26 +0100 Subject: [PATCH] CA: move NodeInfo methods from SnapshotBase to ClusterSnapshot All the NodeInfo methods have to take DRA into account, and the logic for that will be the same for different ClusterSnapshotStore implementations. Instead of duplicating the new logic in Basic and Delta, the methods are moved to ClusterSnapshot and the logic will be implemented once in PredicateSnapshot. PredicateSnapshot will use the DRA Snapshot exposed by its ClusterSnapshotStore to implement these methods. The DRA Snapshot has to be stored in the ClusterSnapshotStore layer, as we need to be able to fork/commit/revert it. Lower-level methods for adding/removing just the schedulerframework.NodeInfo parts are added to ClusterSnapshotStore. PredicateSnapshot utilizes these methods to implement AddNodeInfo and RemoveNodeInfo. This should be a no-op, it's just a refactor. --- .../clustersnapshot/clustersnapshot.go | 33 ++++++++----- .../predicate/plugin_runner.go | 18 +++---- .../predicate/plugin_runner_test.go | 47 ++++++++++--------- .../predicate/predicate_snapshot.go | 37 ++++++++++++++- .../simulator/clustersnapshot/store/basic.go | 30 ++---------- .../simulator/clustersnapshot/store/delta.go | 30 ++---------- .../store/delta_benchmark_test.go | 6 ++- 7 files changed, 103 insertions(+), 98 deletions(-) diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index 52dfc8898e75..bd4aa9494120 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -23,6 +23,7 @@ import ( drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) // ClusterSnapshot is abstraction of cluster state used for predicate simulations. @@ -30,6 +31,19 @@ import ( type ClusterSnapshot interface { ClusterSnapshotStore + // AddNodeInfo adds the given NodeInfo to the snapshot without checking scheduler predicates. The Node and the Pods are added, + // as well as any DRA objects passed along them. + AddNodeInfo(nodeInfo *framework.NodeInfo) error + // RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as + // any DRA objects owned by them. + RemoveNodeInfo(nodeName string) error + // GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot. + // This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos + // obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo. + GetNodeInfo(nodeName string) (*framework.NodeInfo, error) + // ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo. + ListNodeInfos() ([]*framework.NodeInfo, error) + // SchedulePod tries to schedule the given Pod on the Node with the given name inside the snapshot, // checking scheduling predicates. The pod is only scheduled if the predicates pass. If the pod is scheduled, // all relevant DRA objects are modified to reflect that. Returns nil if the pod got scheduled, and a non-nil @@ -68,18 +82,13 @@ type ClusterSnapshotStore interface { // ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot. ForceRemovePod(namespace string, podName string, nodeName string) error - // AddNodeInfo adds the given NodeInfo to the snapshot without checking scheduler predicates. The Node and the Pods are added, - // as well as any DRA objects passed along them. - AddNodeInfo(nodeInfo *framework.NodeInfo) error - // RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as - // any DRA objects owned by them. - RemoveNodeInfo(nodeName string) error - // GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot. - // This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos - // obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo. - GetNodeInfo(nodeName string) (*framework.NodeInfo, error) - // ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo. - ListNodeInfos() ([]*framework.NodeInfo, error) + // AddSchedulerNodeInfo adds the given schedulerframework.NodeInfo to the snapshot without checking scheduler predicates, and + // without taking DRA objects into account. This shouldn't be used outside the clustersnapshot pkg, use ClusterSnapshot.AddNodeInfo() + // instead. + AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error + // RemoveSchedulerNodeInfo removes the given schedulerframework.NodeInfo from the snapshot without taking DRA objects into account. This shouldn't + // be used outside the clustersnapshot pkg, use ClusterSnapshot.RemoveNodeInfo() instead. + RemoveSchedulerNodeInfo(nodeName string) error // DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot. DraSnapshot() drasnapshot.Snapshot diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go index 150b245fdd71..5a0953009ad0 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -30,14 +30,14 @@ import ( // SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework. type SchedulerPluginRunner struct { - fwHandle *framework.Handle - snapshotStore clustersnapshot.ClusterSnapshotStore - lastIndex int + fwHandle *framework.Handle + snapshot clustersnapshot.ClusterSnapshot + lastIndex int } // NewSchedulerPluginRunner builds a SchedulerPluginRunner. -func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotStore clustersnapshot.ClusterSnapshotStore) *SchedulerPluginRunner { - return &SchedulerPluginRunner{fwHandle: fwHandle, snapshotStore: snapshotStore} +func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot) *SchedulerPluginRunner { + return &SchedulerPluginRunner{fwHandle: fwHandle, snapshot: snapshot} } // RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided @@ -45,12 +45,12 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotStore clusters // // The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - nodeInfosList, err := p.snapshotStore.ListNodeInfos() + nodeInfosList, err := p.snapshot.ListNodeInfos() if err != nil { return "", clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err)) } - p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotStore) + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) defer p.fwHandle.DelegatingLister.ResetDelegate() state := schedulerframework.NewCycleState() @@ -101,12 +101,12 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM // RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - nodeInfo, err := p.snapshotStore.GetNodeInfo(nodeName) + nodeInfo, err := p.snapshot.GetNodeInfo(nodeName) if err != nil { return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) } - p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotStore) + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) defer p.fwHandle.DelegatingLister.ResetDelegate() state := schedulerframework.NewCycleState() diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index ad6e280673a7..5c6a7c9d41db 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -139,11 +139,9 @@ func TestRunFiltersOnNode(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - snapshotStore := store.NewBasicSnapshotStore() - err := snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) + pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(tt.customConfig) assert.NoError(t, err) - - pluginRunner, err := newTestPluginRunner(snapshotStore, tt.customConfig) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) @@ -235,15 +233,14 @@ func TestRunFilterUntilPassingNode(t *testing.T) { }, } - snapshotStore := store.NewBasicSnapshotStore() - err = snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(n1000)) - assert.NoError(t, err) - err = snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(n2000)) - assert.NoError(t, err) - for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - pluginRunner, err := newTestPluginRunner(snapshotStore, tc.customConfig) + pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(tc.customConfig) + assert.NoError(t, err) + + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) + assert.NoError(t, err) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) @@ -274,14 +271,14 @@ func TestDebugInfo(t *testing.T) { } SetNodeReadyState(node1, true, time.Time{}) - clusterSnapshot := store.NewBasicSnapshotStore() - err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) + // with default predicate checker + defaultPluginnRunner, clusterSnapshot, err := newTestPluginRunnerAndSnapshot(nil) assert.NoError(t, err) - // with default predicate checker - defaultPluginRunner, err := newTestPluginRunner(clusterSnapshot, nil) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) - predicateErr := defaultPluginRunner.RunFiltersOnNode(p1, "n1") + + predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1") assert.NotNil(t, predicateErr) assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") @@ -305,25 +302,29 @@ func TestDebugInfo(t *testing.T) { customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPluginRunner, err := newTestPluginRunner(clusterSnapshot, customConfig) + customPluginnRunner, clusterSnapshot, err := newTestPluginRunnerAndSnapshot(customConfig) assert.NoError(t, err) - predicateErr = customPluginRunner.RunFiltersOnNode(p1, "n1") + + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) + assert.NoError(t, err) + + predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1") assert.Nil(t, predicateErr) } -// newTestPluginRunner builds test version of SchedulerPluginRunner. -func newTestPluginRunner(snapshotStore clustersnapshot.ClusterSnapshotStore, schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, error) { +func newTestPluginRunnerAndSnapshot(schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, clustersnapshot.ClusterSnapshot, error) { if schedConfig == nil { defaultConfig, err := scheduler_config_latest.Default() if err != nil { - return nil, err + return nil, nil, err } schedConfig = defaultConfig } fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true) if err != nil { - return nil, err + return nil, nil, err } - return NewSchedulerPluginRunner(fwHandle, snapshotStore), nil + snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, false) + return NewSchedulerPluginRunner(fwHandle, snapshot), snapshot, nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index 4697a8d5093a..dd57b47ffa39 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -32,11 +32,44 @@ type PredicateSnapshot struct { // NewPredicateSnapshot builds a PredicateSnapshot. func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle, draEnabled bool) *PredicateSnapshot { - return &PredicateSnapshot{ + snapshot := &PredicateSnapshot{ ClusterSnapshotStore: snapshotStore, - pluginRunner: NewSchedulerPluginRunner(fwHandle, snapshotStore), draEnabled: draEnabled, } + snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot) + return snapshot +} + +// GetNodeInfo returns an internal NodeInfo wrapping the relevant schedulerframework.NodeInfo. +func (s *PredicateSnapshot) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) { + schedNodeInfo, err := s.ClusterSnapshotStore.NodeInfos().Get(nodeName) + if err != nil { + return nil, err + } + return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil +} + +// ListNodeInfos returns internal NodeInfos wrapping all schedulerframework.NodeInfos in the snapshot. +func (s *PredicateSnapshot) ListNodeInfos() ([]*framework.NodeInfo, error) { + schedNodeInfos, err := s.ClusterSnapshotStore.NodeInfos().List() + if err != nil { + return nil, err + } + var result []*framework.NodeInfo + for _, schedNodeInfo := range schedNodeInfos { + result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil)) + } + return result, nil +} + +// AddNodeInfo adds the provided internal NodeInfo to the snapshot. +func (s *PredicateSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) error { + return s.ClusterSnapshotStore.AddSchedulerNodeInfo(nodeInfo.ToScheduler()) +} + +// RemoveNodeInfo removes a NodeInfo matching the provided nodeName from the snapshot. +func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error { + return s.ClusterSnapshotStore.RemoveSchedulerNodeInfo(nodeName) } // SchedulePod adds pod to the snapshot and schedules it to given node. diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/basic.go b/cluster-autoscaler/simulator/clustersnapshot/store/basic.go index 0bfe38819e20..8c62b720685b 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/basic.go @@ -22,7 +22,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -213,31 +212,12 @@ func (snapshot *BasicSnapshotStore) DraSnapshot() drasnapshot.Snapshot { return snapshot.getInternalData().draSnapshot } -// GetNodeInfo gets a NodeInfo. -func (snapshot *BasicSnapshotStore) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) { - schedNodeInfo, err := snapshot.getInternalData().getNodeInfo(nodeName) - if err != nil { - return nil, err - } - return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil -} - -// ListNodeInfos lists NodeInfos. -func (snapshot *BasicSnapshotStore) ListNodeInfos() ([]*framework.NodeInfo, error) { - schedNodeInfos := snapshot.getInternalData().listNodeInfos() - var result []*framework.NodeInfo - for _, schedNodeInfo := range schedNodeInfos { - result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil)) - } - return result, nil -} - -// AddNodeInfo adds a NodeInfo. -func (snapshot *BasicSnapshotStore) AddNodeInfo(nodeInfo *framework.NodeInfo) error { +// AddSchedulerNodeInfo adds a NodeInfo. +func (snapshot *BasicSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { if err := snapshot.getInternalData().addNode(nodeInfo.Node()); err != nil { return err } - for _, podInfo := range nodeInfo.Pods() { + for _, podInfo := range nodeInfo.Pods { if err := snapshot.getInternalData().addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil { return err } @@ -267,8 +247,8 @@ func (snapshot *BasicSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul return nil } -// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *BasicSnapshotStore) RemoveNodeInfo(nodeName string) error { +// RemoveSchedulerNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *BasicSnapshotStore) RemoveSchedulerNodeInfo(nodeName string) error { return snapshot.getInternalData().removeNodeInfo(nodeName) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go index f444ff2f11e4..40154047491a 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go @@ -22,7 +22,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -416,31 +415,12 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot { return drasnapshot.Snapshot{} } -// GetNodeInfo gets a NodeInfo. -func (snapshot *DeltaSnapshotStore) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) { - schedNodeInfo, err := snapshot.getNodeInfo(nodeName) - if err != nil { - return nil, err - } - return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil -} - -// ListNodeInfos lists NodeInfos. -func (snapshot *DeltaSnapshotStore) ListNodeInfos() ([]*framework.NodeInfo, error) { - schedNodeInfos := snapshot.data.getNodeInfoList() - var result []*framework.NodeInfo - for _, schedNodeInfo := range schedNodeInfos { - result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil)) - } - return result, nil -} - -// AddNodeInfo adds a NodeInfo. -func (snapshot *DeltaSnapshotStore) AddNodeInfo(nodeInfo *framework.NodeInfo) error { +// AddSchedulerNodeInfo adds a NodeInfo. +func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { if err := snapshot.data.addNode(nodeInfo.Node()); err != nil { return err } - for _, podInfo := range nodeInfo.Pods() { + for _, podInfo := range nodeInfo.Pods { if err := snapshot.data.addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil { return err } @@ -470,8 +450,8 @@ func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul return nil } -// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *DeltaSnapshotStore) RemoveNodeInfo(nodeName string) error { +// RemoveSchedulerNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *DeltaSnapshotStore) RemoveSchedulerNodeInfo(nodeName string) error { return snapshot.data.removeNodeInfo(nodeName) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go index 8440b73dc3cd..c10776cc9de8 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go @@ -24,7 +24,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) func BenchmarkBuildNodeInfoList(b *testing.B) { @@ -54,7 +54,9 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { } deltaStore.Fork() for _, node := range nodes[tc.nodeCount:] { - if err := deltaStore.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil { + schedNodeInfo := schedulerframework.NewNodeInfo() + schedNodeInfo.SetNode(node) + if err := deltaStore.AddSchedulerNodeInfo(schedNodeInfo); err != nil { assert.NoError(b, err) } }