diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go index 0ec929814a1a..550f8a10520f 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -23,7 +23,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) type filterOutExpendable struct { @@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods // CA logic from before migration to scheduler framework. So let's keep it for now func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error { for _, p := range pods { - if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { + if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil { klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err) return caerrors.ToAutoscalerError(caerrors.InternalError, err) } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index a6833e213b38..6902b13df35c 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -464,7 +464,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming) // Remove the nodes from the snapshot as well so that the state is consistent. for _, notStartedNodeName := range allRegisteredUpcoming { - err := a.ClusterSnapshot.RemoveNode(notStartedNodeName) + err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName) if err != nil { klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err) // ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the @@ -660,16 +660,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[ nodeGroups := a.nodeGroupsById() upcomingNodeGroups := make(map[string]int) upcomingNodesFromUpcomingNodeGroups := 0 - for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) { + for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) { nodeGroup := nodeGroups[nodeGroupName] if nodeGroup == nil { return fmt.Errorf("failed to find node group: %s", nodeGroupName) } isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup) - for _, upcomingNode := range upcomingNodes { - err := a.ClusterSnapshot.AddNodeInfo(upcomingNode) + for _, upcomingNodeInfo := range upcomingNodeInfos { + err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo) if err != nil { - return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err) + return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err) } if isUpcomingNodeGroup { upcomingNodesFromUpcomingNodeGroups++ diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index 46470609f50d..55e1de431997 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -25,7 +25,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) // BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods. @@ -225,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode( pod *apiv1.Pod, nodeName string, ) error { - if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil { + if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil { return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err) } estimationState.newNodesWithPods[nodeName] = true diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 6855ae5efb95..e81d9ecea0ff 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -32,7 +32,7 @@ import ( kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) // NodeToBeRemoved contain information about a node that can be removed. @@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n // remove pods from clusterSnapshot first for _, pod := range pods { - if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil { + if err := r.clusterSnapshot.ForceRemovePod(pod.Namespace, pod.Name, removedNode); err != nil { // just log error klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/basic.go b/cluster-autoscaler/simulator/clustersnapshot/basic.go index e85ea865b40b..38b29de9e306 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/basic.go @@ -153,7 +153,7 @@ func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error { return nil } -func (data *internalBasicSnapshotData) removeNode(nodeName string) error { +func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error { if _, found := data.nodeInfoMap[nodeName]; !found { return ErrNodeNotFound } @@ -253,18 +253,18 @@ func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched return nil } -// RemoveNode removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error { - return snapshot.getInternalData().removeNode(nodeName) +// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *BasicClusterSnapshot) RemoveNodeInfo(nodeName string) error { + return snapshot.getInternalData().removeNodeInfo(nodeName) } -// AddPod adds pod to the snapshot and schedules it to given node. -func (snapshot *BasicClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error { +// ForceAddPod adds pod to the snapshot and schedules it to given node. +func (snapshot *BasicClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error { return snapshot.getInternalData().addPod(pod, nodeName) } -// RemovePod removes pod from the snapshot. -func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName string) error { +// ForceRemovePod removes pod from the snapshot. +func (snapshot *BasicClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error { return snapshot.getInternalData().removePod(namespace, podName, nodeName) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index 199f1c74f116..969986ee00a9 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -34,16 +34,17 @@ type ClusterSnapshot interface { // with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName. SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error - // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. - RemoveNode(nodeName string) error - // AddPod adds pod to the snapshot and schedules it to given node. - AddPod(pod *apiv1.Pod, nodeName string) error - // RemovePod removes pod from the snapshot. - RemovePod(namespace string, podName string, nodeName string) error + // ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot. + ForceAddPod(pod *apiv1.Pod, nodeName string) error + // ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot. + ForceRemovePod(namespace string, podName string, nodeName string) error // AddNodeInfo adds the given NodeInfo to the snapshot. The Node and the Pods are added, as well as // any DRA objects passed along them. AddNodeInfo(nodeInfo *framework.NodeInfo) error + // RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as + // any DRA objects owned by them. + RemoveNodeInfo(nodeName string) error // GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot. // This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos // obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo. diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go index 091b7f92f539..fc1c519c911d 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go @@ -133,7 +133,7 @@ func BenchmarkAddPods(b *testing.B) { err := clusterSnapshot.SetClusterState(nodes, nil) assert.NoError(b, err) b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddPod() 30*%d", snapshotName, tc), func(b *testing.B) { + b.Run(fmt.Sprintf("%s: ForceAddPod() 30*%d", snapshotName, tc), func(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() @@ -143,7 +143,7 @@ func BenchmarkAddPods(b *testing.B) { } b.StartTimer() for _, pod := range pods { - err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName) + err = clusterSnapshot.ForceAddPod(pod, pod.Spec.NodeName) if err != nil { assert.NoError(b, err) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go index f951cf1e4a94..5b564f64ecdd 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go @@ -115,22 +115,22 @@ func validTestCases(t *testing.T) []modificationTestCase { }, }, { - name: "remove node", + name: "remove nodeInfo", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.RemoveNode(node.Name) + err := snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) }, }, { - name: "remove node, then add it back", + name: "remove nodeInfo, then add it back", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.RemoveNode(node.Name) + err := snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) @@ -141,14 +141,14 @@ func validTestCases(t *testing.T) []modificationTestCase { }, }, { - name: "add pod, then remove node", + name: "add pod, then remove nodeInfo", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.AddPod(pod, node.Name) + err := snapshot.ForceAddPod(pod, node.Name) assert.NoError(t, err) - err = snapshot.RemoveNode(node.Name) + err = snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) }, }, @@ -326,7 +326,7 @@ func TestClear(t *testing.T) { } for _, pod := range extraPods { - err := snapshot.AddPod(pod, pod.Spec.NodeName) + err := snapshot.ForceAddPod(pod, pod.Spec.NodeName) assert.NoError(t, err) } @@ -349,17 +349,17 @@ func TestNode404(t *testing.T) { op func(ClusterSnapshot) error }{ {"add pod", func(snapshot ClusterSnapshot) error { - return snapshot.AddPod(BuildTestPod("p1", 0, 0), "node") + return snapshot.ForceAddPod(BuildTestPod("p1", 0, 0), "node") }}, {"remove pod", func(snapshot ClusterSnapshot) error { - return snapshot.RemovePod("default", "p1", "node") + return snapshot.ForceRemovePod("default", "p1", "node") }}, {"get node", func(snapshot ClusterSnapshot) error { _, err := snapshot.NodeInfos().Get("node") return err }}, - {"remove node", func(snapshot ClusterSnapshot) error { - return snapshot.RemoveNode("node") + {"remove nodeInfo", func(snapshot ClusterSnapshot) error { + return snapshot.RemoveNodeInfo("node") }}, } @@ -385,7 +385,7 @@ func TestNode404(t *testing.T) { snapshot.Fork() assert.NoError(t, err) - err = snapshot.RemoveNode("node") + err = snapshot.RemoveNodeInfo("node") assert.NoError(t, err) // Node deleted after fork - shouldn't be able to operate on it. @@ -408,7 +408,7 @@ func TestNode404(t *testing.T) { err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) - err = snapshot.RemoveNode("node") + err = snapshot.RemoveNodeInfo("node") assert.NoError(t, err) // Node deleted from base - shouldn't be able to operate on it. @@ -625,7 +625,7 @@ func TestPVCUsedByPods(t *testing.T) { assert.Equal(t, tc.exists, volumeExists) if tc.removePod != "" { - err = snapshot.RemovePod("default", tc.removePod, "node") + err = snapshot.ForceRemovePod("default", tc.removePod, "node") assert.NoError(t, err) volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) @@ -698,7 +698,7 @@ func TestPVCClearAndFork(t *testing.T) { volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, true, volumeExists) - err = snapshot.AddPod(pod2, "node") + err = snapshot.ForceAddPod(pod2, "node") assert.NoError(t, err) volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) diff --git a/cluster-autoscaler/simulator/clustersnapshot/delta.go b/cluster-autoscaler/simulator/clustersnapshot/delta.go index c490d679db41..3f7322b4684d 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/delta.go @@ -177,7 +177,7 @@ func (data *internalDeltaSnapshotData) clearPodCaches() { data.pvcNamespaceMap = nil } -func (data *internalDeltaSnapshotData) removeNode(nodeName string) error { +func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error { _, foundInDelta := data.addedNodeInfoMap[nodeName] if foundInDelta { // If node was added within this delta, delete this change. @@ -296,12 +296,12 @@ func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, err return data, nil } for node := range data.deletedNodeInfos { - if err := data.baseData.removeNode(node); err != nil { + if err := data.baseData.removeNodeInfo(node); err != nil { return nil, err } } for _, node := range data.modifiedNodeInfoMap { - if err := data.baseData.removeNode(node.Node().Name); err != nil { + if err := data.baseData.removeNodeInfo(node.Node().Name); err != nil { return nil, err } if err := data.baseData.addNodeInfo(node); err != nil { @@ -442,18 +442,18 @@ func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched return nil } -// RemoveNode removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error { - return snapshot.data.removeNode(nodeName) +// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *DeltaClusterSnapshot) RemoveNodeInfo(nodeName string) error { + return snapshot.data.removeNodeInfo(nodeName) } -// AddPod adds pod to the snapshot and schedules it to given node. -func (snapshot *DeltaClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error { +// ForceAddPod adds pod to the snapshot and schedules it to given node. +func (snapshot *DeltaClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error { return snapshot.data.addPod(pod, nodeName) } -// RemovePod removes pod from the snapshot. -func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName string) error { +// ForceRemovePod removes pod from the snapshot. +func (snapshot *DeltaClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error { return snapshot.data.removePod(namespace, podName, nodeName) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go index 0307ae78bca9..c457d6bf187d 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go +++ b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" ) @@ -42,10 +43,10 @@ func InitializeClusterSnapshotOrDie( for _, pod := range pods { if pod.Spec.NodeName != "" { - err = snapshot.AddPod(pod, pod.Spec.NodeName) + err = snapshot.ForceAddPod(pod, pod.Spec.NodeName) assert.NoError(t, err, "error while adding pod %s/%s to node %s", pod.Namespace, pod.Name, pod.Spec.NodeName) } else if pod.Status.NominatedNodeName != "" { - err = snapshot.AddPod(pod, pod.Status.NominatedNodeName) + err = snapshot.ForceAddPod(pod, pod.Status.NominatedNodeName) assert.NoError(t, err, "error while adding pod %s/%s to nominated node %s", pod.Namespace, pod.Name, pod.Status.NominatedNodeName) } else { assert.Fail(t, "pod %s/%s does not have Spec.NodeName nor Status.NominatedNodeName set", pod.Namespace, pod.Name) diff --git a/cluster-autoscaler/simulator/scheduling/hinting_simulator.go b/cluster-autoscaler/simulator/scheduling/hinting_simulator.go index 2287d28810e4..2f24bb8bf4ba 100644 --- a/cluster-autoscaler/simulator/scheduling/hinting_simulator.go +++ b/cluster-autoscaler/simulator/scheduling/hinting_simulator.go @@ -73,7 +73,7 @@ func (s *HintingSimulator) TrySchedulePods(clusterSnapshot clustersnapshot.Clust if nodeName != "" { klogx.V(4).UpTo(loggingQuota).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, nodeName) - if err := clusterSnapshot.AddPod(pod, nodeName); err != nil { + if err := clusterSnapshot.ForceAddPod(pod, nodeName); err != nil { return nil, 0, fmt.Errorf("simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, nodeName, err) } statuses = append(statuses, Status{Pod: pod, NodeName: nodeName})