From bcc8d9e51b42807705a67bf931b6ecb70ca041ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Thu, 21 Nov 2024 17:37:26 +0100 Subject: [PATCH] CA: move NodeInfo methods from SnapshotBase to ClusterSnapshot All the NodeInfo methods have to take DRA into account, and the logic for that will be the same for different SnapshotBase implementations. Instead of duplicating the new logic in Basic and Delta, the methods are moved to ClusterSnapshot and the logic will be implemented once in PredicateSnapshot. PredicateSnapshot will use the DRA Snapshot exposed by its SnapshotBase to implement these methods. The DRA Snapshot has to be stored in the SnapshotBase layer, as we need to be able to fork/commit/revert it. Lower-level methods for adding/removing just the schedulerframework.NodeInfo parts are added to SnapshotBase. PredicateSnapshot utilizes these methods to implement AddNodeInfo and RemoveNodeInfo. This should be a no-op, it's just a refactor. --- .../simulator/clustersnapshot/base/basic.go | 30 +++---------- .../simulator/clustersnapshot/base/delta.go | 30 +++---------- .../base/delta_benchmark_test.go | 6 ++- .../clustersnapshot/clustersnapshot.go | 33 ++++++++------ .../predicate/plugin_runner.go | 18 ++++---- .../predicate/plugin_runner_test.go | 43 ++++++++++--------- .../predicate/predicate_snapshot.go | 37 +++++++++++++++- 7 files changed, 101 insertions(+), 96 deletions(-) diff --git a/cluster-autoscaler/simulator/clustersnapshot/base/basic.go b/cluster-autoscaler/simulator/clustersnapshot/base/basic.go index 78c3df2b1a72..2d0bdd036995 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/base/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/base/basic.go @@ -22,7 +22,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -213,31 +212,12 @@ func (snapshot *BasicSnapshotBase) DraSnapshot() drasnapshot.Snapshot { return snapshot.getInternalData().draSnapshot } -// GetNodeInfo gets a NodeInfo. -func (snapshot *BasicSnapshotBase) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) { - schedNodeInfo, err := snapshot.getInternalData().getNodeInfo(nodeName) - if err != nil { - return nil, err - } - return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil -} - -// ListNodeInfos lists NodeInfos. -func (snapshot *BasicSnapshotBase) ListNodeInfos() ([]*framework.NodeInfo, error) { - schedNodeInfos := snapshot.getInternalData().listNodeInfos() - var result []*framework.NodeInfo - for _, schedNodeInfo := range schedNodeInfos { - result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil)) - } - return result, nil -} - -// AddNodeInfo adds a NodeInfo. -func (snapshot *BasicSnapshotBase) AddNodeInfo(nodeInfo *framework.NodeInfo) error { +// AddSchedulerNodeInfo adds a NodeInfo. +func (snapshot *BasicSnapshotBase) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { if err := snapshot.getInternalData().addNode(nodeInfo.Node()); err != nil { return err } - for _, podInfo := range nodeInfo.Pods() { + for _, podInfo := range nodeInfo.Pods { if err := snapshot.getInternalData().addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil { return err } @@ -267,8 +247,8 @@ func (snapshot *BasicSnapshotBase) SetClusterState(nodes []*apiv1.Node, schedule return nil } -// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *BasicSnapshotBase) RemoveNodeInfo(nodeName string) error { +// RemoveSchedulerNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *BasicSnapshotBase) RemoveSchedulerNodeInfo(nodeName string) error { return snapshot.getInternalData().removeNodeInfo(nodeName) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/base/delta.go b/cluster-autoscaler/simulator/clustersnapshot/base/delta.go index 7728211f4dd5..51ec83ba02eb 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/base/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/base/delta.go @@ -22,7 +22,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -417,31 +416,12 @@ func (snapshot *DeltaSnapshotBase) DraSnapshot() drasnapshot.Snapshot { return drasnapshot.Snapshot{} } -// GetNodeInfo gets a NodeInfo. -func (snapshot *DeltaSnapshotBase) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) { - schedNodeInfo, err := snapshot.getNodeInfo(nodeName) - if err != nil { - return nil, err - } - return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil -} - -// ListNodeInfos lists NodeInfos. -func (snapshot *DeltaSnapshotBase) ListNodeInfos() ([]*framework.NodeInfo, error) { - schedNodeInfos := snapshot.data.getNodeInfoList() - var result []*framework.NodeInfo - for _, schedNodeInfo := range schedNodeInfos { - result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil)) - } - return result, nil -} - -// AddNodeInfo adds a NodeInfo. -func (snapshot *DeltaSnapshotBase) AddNodeInfo(nodeInfo *framework.NodeInfo) error { +// AddSchedulerNodeInfo adds a NodeInfo. +func (snapshot *DeltaSnapshotBase) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { if err := snapshot.data.addNode(nodeInfo.Node()); err != nil { return err } - for _, podInfo := range nodeInfo.Pods() { + for _, podInfo := range nodeInfo.Pods { if err := snapshot.data.addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil { return err } @@ -471,8 +451,8 @@ func (snapshot *DeltaSnapshotBase) SetClusterState(nodes []*apiv1.Node, schedule return nil } -// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *DeltaSnapshotBase) RemoveNodeInfo(nodeName string) error { +// RemoveSchedulerNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *DeltaSnapshotBase) RemoveSchedulerNodeInfo(nodeName string) error { return snapshot.data.removeNodeInfo(nodeName) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/base/delta_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/base/delta_benchmark_test.go index d7be53a3e41f..915a7a6cfcf8 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/base/delta_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/base/delta_benchmark_test.go @@ -24,7 +24,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) func BenchmarkBuildNodeInfoList(b *testing.B) { @@ -54,7 +54,9 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { } clusterSnapshot.Fork() for _, node := range nodes[tc.nodeCount:] { - if err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil { + schedNodeInfo := schedulerframework.NewNodeInfo() + schedNodeInfo.SetNode(node) + if err := clusterSnapshot.AddSchedulerNodeInfo(schedNodeInfo); err != nil { assert.NoError(b, err) } } diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index 0e93eefb0fd4..2f61d1f594f5 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -23,6 +23,7 @@ import ( drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) // ClusterSnapshot is abstraction of cluster state used for predicate simulations. @@ -30,6 +31,19 @@ import ( type ClusterSnapshot interface { SnapshotBase + // AddNodeInfo adds the given NodeInfo to the snapshot without checking scheduler predicates. The Node and the Pods are added, + // as well as any DRA objects passed along them. + AddNodeInfo(nodeInfo *framework.NodeInfo) error + // RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as + // any DRA objects owned by them. + RemoveNodeInfo(nodeName string) error + // GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot. + // This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos + // obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo. + GetNodeInfo(nodeName string) (*framework.NodeInfo, error) + // ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo. + ListNodeInfos() ([]*framework.NodeInfo, error) + // SchedulePod tries to schedule the given Pod on the Node with the given name inside the snapshot, // checking scheduling predicates. The pod is only scheduled if the predicates pass. If the pod is scheduled, // all relevant DRA objects are modified to reflect that. Returns nil if the pod got scheduled, and a non-nil @@ -66,18 +80,13 @@ type SnapshotBase interface { // ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot. ForceRemovePod(namespace string, podName string, nodeName string) error - // AddNodeInfo adds the given NodeInfo to the snapshot without checking scheduler predicates. The Node and the Pods are added, - // as well as any DRA objects passed along them. - AddNodeInfo(nodeInfo *framework.NodeInfo) error - // RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as - // any DRA objects owned by them. - RemoveNodeInfo(nodeName string) error - // GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot. - // This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos - // obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo. - GetNodeInfo(nodeName string) (*framework.NodeInfo, error) - // ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo. - ListNodeInfos() ([]*framework.NodeInfo, error) + // AddSchedulerNodeInfo adds the given schedulerframework.NodeInfo to the snapshot without checking scheduler predicates, and + // without taking DRA objects into account. This shouldn't be used outside the clustersnapshot pkg, use ClusterSnapshot.AddNodeInfo() + // instead. + AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error + // RemoveSchedulerNodeInfo removes the given schedulerframework.NodeInfo from the snapshot without taking DRA objects into account. This shouldn't + // be used outside the clustersnapshot pkg, use ClusterSnapshot.RemoveNodeInfo() instead. + RemoveSchedulerNodeInfo(nodeName string) error // DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot. DraSnapshot() drasnapshot.Snapshot diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go index 82b49b54670d..29d291221cee 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -30,14 +30,14 @@ import ( // SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework. type SchedulerPluginRunner struct { - fwHandle *framework.Handle - snapshotBase clustersnapshot.SnapshotBase - lastIndex int + fwHandle *framework.Handle + snapshot clustersnapshot.ClusterSnapshot + lastIndex int } // NewSchedulerPluginRunner builds a SchedulerPluginRunner. -func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotBase clustersnapshot.SnapshotBase) *SchedulerPluginRunner { - return &SchedulerPluginRunner{fwHandle: fwHandle, snapshotBase: snapshotBase} +func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot) *SchedulerPluginRunner { + return &SchedulerPluginRunner{fwHandle: fwHandle, snapshot: snapshot} } // RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided @@ -45,12 +45,12 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotBase clustersn // // The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - nodeInfosList, err := p.snapshotBase.ListNodeInfos() + nodeInfosList, err := p.snapshot.ListNodeInfos() if err != nil { return "", clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided") } - p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase) + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) defer p.fwHandle.DelegatingLister.ResetDelegate() state := schedulerframework.NewCycleState() @@ -85,12 +85,12 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM // RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - nodeInfo, err := p.snapshotBase.GetNodeInfo(nodeName) + nodeInfo, err := p.snapshot.GetNodeInfo(nodeName) if err != nil { return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) } - p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase) + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) defer p.fwHandle.DelegatingLister.ResetDelegate() state := schedulerframework.NewCycleState() diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index 316b269889b1..1c84862c412c 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -139,11 +139,9 @@ func TestRunFiltersOnNode(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - snapshotBase := base.NewBasicSnapshotBase() - err := snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) + pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(tt.customConfig) assert.NoError(t, err) - - pluginRunner, err := newTestPluginRunner(snapshotBase, tt.customConfig) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) @@ -235,15 +233,14 @@ func TestRunFilterUntilPassingNode(t *testing.T) { }, } - snapshotBase := base.NewBasicSnapshotBase() - err = snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(n1000)) - assert.NoError(t, err) - err = snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(n2000)) - assert.NoError(t, err) - for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - pluginRunner, err := newTestPluginRunner(snapshotBase, tc.customConfig) + pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(tc.customConfig) + assert.NoError(t, err) + + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) + assert.NoError(t, err) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) @@ -274,13 +271,13 @@ func TestDebugInfo(t *testing.T) { } SetNodeReadyState(node1, true, time.Time{}) - clusterSnapshot := base.NewBasicSnapshotBase() - err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) + // with default predicate checker + defaultPluginnRunner, clusterSnapshot, err := newTestPluginRunnerAndSnapshot(nil) assert.NoError(t, err) - // with default predicate checker - defaultPluginnRunner, err := newTestPluginRunner(clusterSnapshot, nil) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) + predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1") assert.NotNil(t, predicateErr) assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") @@ -305,25 +302,29 @@ func TestDebugInfo(t *testing.T) { customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPluginnRunner, err := newTestPluginRunner(clusterSnapshot, customConfig) + customPluginnRunner, clusterSnapshot, err := newTestPluginRunnerAndSnapshot(customConfig) assert.NoError(t, err) + + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) + assert.NoError(t, err) + predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1") assert.Nil(t, predicateErr) } -// newTestPluginRunner builds test version of SchedulerPluginRunner. -func newTestPluginRunner(snapshotBase clustersnapshot.SnapshotBase, schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, error) { +func newTestPluginRunnerAndSnapshot(schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, clustersnapshot.ClusterSnapshot, error) { if schedConfig == nil { defaultConfig, err := scheduler_config_latest.Default() if err != nil { - return nil, err + return nil, nil, err } schedConfig = defaultConfig } fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true) if err != nil { - return nil, err + return nil, nil, err } - return NewSchedulerPluginRunner(fwHandle, snapshotBase), nil + snapshot := NewPredicateSnapshot(base.NewBasicSnapshotBase(), fwHandle, false) + return NewSchedulerPluginRunner(fwHandle, snapshot), snapshot, nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index 76a37a1e1fe3..7b119b077730 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -32,11 +32,44 @@ type PredicateSnapshot struct { // NewPredicateSnapshot builds a PredicateSnapshot. func NewPredicateSnapshot(snapshotBase clustersnapshot.SnapshotBase, fwHandle *framework.Handle, draEnabled bool) *PredicateSnapshot { - return &PredicateSnapshot{ + snapshot := &PredicateSnapshot{ SnapshotBase: snapshotBase, - pluginRunner: NewSchedulerPluginRunner(fwHandle, snapshotBase), draEnabled: draEnabled, } + snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot) + return snapshot +} + +// GetNodeInfo returns an internal NodeInfo wrapping the relevant schedulerframework.NodeInfo. +func (s *PredicateSnapshot) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) { + schedNodeInfo, err := s.SnapshotBase.NodeInfos().Get(nodeName) + if err != nil { + return nil, err + } + return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil +} + +// ListNodeInfos returns internal NodeInfos wrapping all schedulerframework.NodeInfos in the snapshot. +func (s *PredicateSnapshot) ListNodeInfos() ([]*framework.NodeInfo, error) { + schedNodeInfos, err := s.SnapshotBase.NodeInfos().List() + if err != nil { + return nil, err + } + var result []*framework.NodeInfo + for _, schedNodeInfo := range schedNodeInfos { + result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil)) + } + return result, nil +} + +// AddNodeInfo adds the provided internal NodeInfo to the snapshot. +func (s *PredicateSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) error { + return s.SnapshotBase.AddSchedulerNodeInfo(nodeInfo.ToScheduler()) +} + +// RemoveNodeInfo removes a NodeInfo matching the provided nodeName from the snapshot. +func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error { + return s.SnapshotBase.RemoveSchedulerNodeInfo(nodeName) } // SchedulePod adds pod to the snapshot and schedules it to given node.