diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 4320cb32e46c..6c75831c12aa 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -138,15 +138,15 @@ func (o *ScaleUpOrchestrator) ScaleUp( } // Calculate expansion options - schedulablePods := map[string][]*apiv1.Pod{} + schedulablePodGroups := map[string][]estimator.PodEquivalenceGroup{} var options []expander.Option for _, nodeGroup := range validNodeGroups { - schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()]) + schedulablePodGroups[nodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()]) } for _, nodeGroup := range validNodeGroups { - option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, len(nodes)+len(upcomingNodes), now) + option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now) o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id()) if len(option.Pods) == 0 || option.NodeCount == 0 { @@ -195,14 +195,14 @@ func (o *ScaleUpOrchestrator) ScaleUp( createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0) if !bestOption.NodeGroup.Exist() { var scaleUpStatus *status.ScaleUpStatus - createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePods, podEquivalenceGroups, daemonSets) + createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets) if aErr != nil { return scaleUpStatus, aErr } } // Recompute similar node groups in case they need to be updated - bestOption.SimilarNodeGroups = o.ComputeSimilarNodeGroups(bestOption.NodeGroup, nodeInfos, schedulablePods, now) + bestOption.SimilarNodeGroups = o.ComputeSimilarNodeGroups(bestOption.NodeGroup, nodeInfos, schedulablePodGroups, now) if bestOption.SimilarNodeGroups != nil { // if similar node groups are found, log about them similarNodeGroupIds := make([]string, 0) @@ -440,20 +440,20 @@ func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups( // ComputeExpansionOption computes expansion option based on pending pods and cluster state. func (o *ScaleUpOrchestrator) ComputeExpansionOption( nodeGroup cloudprovider.NodeGroup, - schedulablePods map[string][]*apiv1.Pod, + schedulablePodGroups map[string][]estimator.PodEquivalenceGroup, nodeInfos map[string]*schedulerframework.NodeInfo, currentNodeCount int, now time.Time, ) expander.Option { option := expander.Option{NodeGroup: nodeGroup} - pods := schedulablePods[nodeGroup.Id()] + podGroups := schedulablePodGroups[nodeGroup.Id()] nodeInfo := nodeInfos[nodeGroup.Id()] - if len(pods) == 0 { + if len(podGroups) == 0 { return option } - option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now) + option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePodGroups, now) estimateStart := time.Now() expansionEstimator := o.estimatorBuilder( @@ -461,7 +461,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( o.autoscalingContext.ClusterSnapshot, estimator.NewEstimationContext(o.autoscalingContext.MaxNodesTotal, option.SimilarNodeGroups, currentNodeCount), ) - option.NodeCount, option.Pods = expansionEstimator.Estimate(pods, nodeInfo, nodeGroup) + option.NodeCount, option.Pods = expansionEstimator.Estimate(podGroups, nodeInfo, nodeGroup) metrics.UpdateDurationFromStart(metrics.Estimate, estimateStart) autoscalingOptions, err := nodeGroup.GetOptions(o.autoscalingContext.NodeGroupDefaults) @@ -480,7 +480,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( func (o *ScaleUpOrchestrator) CreateNodeGroup( initialOption *expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo, - schedulablePods map[string][]*apiv1.Pod, + schedulablePodGroups map[string][]estimator.PodEquivalenceGroup, podEquivalenceGroups []*equivalence.PodGroup, daemonSets []*appsv1.DaemonSet, ) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) { @@ -503,16 +503,16 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup( mainCreatedNodeInfo, aErr := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, o.taintConfig) if aErr == nil { nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo - schedulablePods[createNodeGroupResult.MainCreatedNodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, createNodeGroupResult.MainCreatedNodeGroup, mainCreatedNodeInfo) + schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, createNodeGroupResult.MainCreatedNodeGroup, mainCreatedNodeInfo) } else { klog.Warningf("Cannot build node info for newly created main node group %v; balancing similar node groups may not work; err=%v", createNodeGroupResult.MainCreatedNodeGroup.Id(), aErr) // Use node info based on expansion candidate but update Id which likely changed when node group was created. nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = nodeInfos[oldId] - schedulablePods[createNodeGroupResult.MainCreatedNodeGroup.Id()] = schedulablePods[oldId] + schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = schedulablePodGroups[oldId] } if oldId != createNodeGroupResult.MainCreatedNodeGroup.Id() { delete(nodeInfos, oldId) - delete(schedulablePods, oldId) + delete(schedulablePodGroups, oldId) } for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups { nodeInfo, aErr := utils.GetNodeInfoFromTemplate(nodeGroup, daemonSets, o.taintConfig) @@ -521,7 +521,7 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup( continue } nodeInfos[nodeGroup.Id()] = nodeInfo - schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfo) + schedulablePodGroups[nodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, nodeGroup, nodeInfo) } // Update ClusterStateRegistry so similar nodegroups rebalancing works. @@ -531,13 +531,13 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup( return createNodeGroupResults, nil, nil } -// SchedulablePods returns a list of pods that could be scheduled +// SchedulablePodGroups returns a list of pods that could be scheduled // in a given node group after a scale up. -func (o *ScaleUpOrchestrator) SchedulablePods( +func (o *ScaleUpOrchestrator) SchedulablePodGroups( podEquivalenceGroups []*equivalence.PodGroup, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo, -) []*apiv1.Pod { +) []estimator.PodEquivalenceGroup { o.autoscalingContext.ClusterSnapshot.Fork() defer o.autoscalingContext.ClusterSnapshot.Revert() @@ -548,15 +548,17 @@ func (o *ScaleUpOrchestrator) SchedulablePods( } if err := o.autoscalingContext.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), allPods); err != nil { klog.Errorf("Error while adding test Node: %v", err) - return []*apiv1.Pod{} + return []estimator.PodEquivalenceGroup{} } - var schedulablePods []*apiv1.Pod + var schedulablePodGroups []estimator.PodEquivalenceGroup for _, eg := range podEquivalenceGroups { samplePod := eg.Pods[0] if err := o.autoscalingContext.PredicateChecker.CheckPredicates(o.autoscalingContext.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil { // Add pods to option. - schedulablePods = append(schedulablePods, eg.Pods...) + schedulablePodGroups = append(schedulablePodGroups, estimator.PodEquivalenceGroup{ + Pods: eg.Pods, + }) // Mark pod group as (theoretically) schedulable. eg.Schedulable = true } else { @@ -568,7 +570,7 @@ func (o *ScaleUpOrchestrator) SchedulablePods( } } - return schedulablePods + return schedulablePodGroups } // UpcomingNodes returns a list of nodes that are not ready but should be. @@ -652,7 +654,7 @@ func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCou func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups( nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*schedulerframework.NodeInfo, - schedulablePods map[string][]*apiv1.Pod, + schedulablePodGroups map[string][]estimator.PodEquivalenceGroup, now time.Time, ) []cloudprovider.NodeGroup { if !o.autoscalingContext.BalanceSimilarNodeGroups { @@ -667,8 +669,8 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups( return nil } - groupSchedulablePods, found := schedulablePods[nodeGroup.Id()] - if !found || len(groupSchedulablePods) == 0 { + podGroups, found := schedulablePodGroups[nodeGroup.Id()] + if !found || len(podGroups) == 0 { return nil } @@ -683,7 +685,7 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups( // Non-existing node groups are created later so skip check for them. if ng.Exist() && !o.clusterStateRegistry.NodeGroupScaleUpSafety(ng, now).SafeToScale { klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id()) - } else if similarSchedulablePods, found := schedulablePods[ng.Id()]; found && matchingSchedulablePods(groupSchedulablePods, similarSchedulablePods) { + } else if similarPodGroups, found := schedulablePodGroups[ng.Id()]; found && matchingSchedulablePodGroups(podGroups, similarPodGroups) { validSimilarNodeGroups = append(validSimilarNodeGroups, ng) } } @@ -691,13 +693,13 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups( return validSimilarNodeGroups } -func matchingSchedulablePods(groupSchedulablePods []*apiv1.Pod, similarSchedulablePods []*apiv1.Pod) bool { - schedulablePods := make(map[*apiv1.Pod]bool) - for _, pod := range similarSchedulablePods { - schedulablePods[pod] = true +func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool { + schedulableSamplePods := make(map[*apiv1.Pod]bool) + for _, podGroup := range similarPodGroups { + schedulableSamplePods[podGroup.Exemplar()] = true } - for _, pod := range groupSchedulablePods { - if _, found := schedulablePods[pod]; !found { + for _, podGroup := range podGroups { + if _, found := schedulableSamplePods[podGroup.Exemplar()]; !found { return false } } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 095e8b8fd1ee..3972602a6c50 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -52,6 +52,7 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/fake" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -1208,9 +1209,9 @@ func (p *constNodeGroupSetProcessor) BalanceScaleUpBetweenGroups(_ *context.Auto func (p *constNodeGroupSetProcessor) CleanUp() {} func TestComputeSimilarNodeGroups(t *testing.T) { - pod1 := BuildTestPod("p1", 100, 1000) - pod2 := BuildTestPod("p2", 100, 1000) - pod3 := BuildTestPod("p3", 100, 1000) + podGroup1 := estimator.PodEquivalenceGroup{Pods: []*v1.Pod{BuildTestPod("p1", 100, 1000)}} + podGroup2 := estimator.PodEquivalenceGroup{Pods: []*v1.Pod{BuildTestPod("p2", 100, 1000)}} + podGroup3 := estimator.PodEquivalenceGroup{Pods: []*v1.Pod{BuildTestPod("p3", 100, 1000)}} testCases := []struct { name string @@ -1218,7 +1219,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) { similarNodeGroups []string otherNodeGroups []string balancingEnabled bool - schedulablePods map[string][]*apiv1.Pod + schedulablePodGroups map[string][]estimator.PodEquivalenceGroup wantSimilarNodeGroups []string }{ { @@ -1242,12 +1243,12 @@ func TestComputeSimilarNodeGroups(t *testing.T) { similarNodeGroups: []string{"ng2", "ng3"}, otherNodeGroups: []string{"pg1", "pg2"}, balancingEnabled: false, - schedulablePods: map[string][]*apiv1.Pod{ - "ng1": {pod1}, - "ng2": {pod1}, - "ng3": {pod1}, - "pg1": {pod1}, - "pg2": {pod1}, + schedulablePodGroups: map[string][]estimator.PodEquivalenceGroup{ + "ng1": {podGroup1}, + "ng2": {podGroup1}, + "ng3": {podGroup1}, + "pg1": {podGroup1}, + "pg2": {podGroup1}, }, wantSimilarNodeGroups: []string{}, }, @@ -1257,12 +1258,12 @@ func TestComputeSimilarNodeGroups(t *testing.T) { similarNodeGroups: []string{"ng2", "ng3"}, otherNodeGroups: []string{"pg1", "pg2"}, balancingEnabled: true, - schedulablePods: map[string][]*apiv1.Pod{ - "ng1": {pod1}, - "ng2": {pod1}, - "ng3": {pod1}, - "pg1": {pod1}, - "pg2": {pod1}, + schedulablePodGroups: map[string][]estimator.PodEquivalenceGroup{ + "ng1": {podGroup1}, + "ng2": {podGroup1}, + "ng3": {podGroup1}, + "pg1": {podGroup1}, + "pg2": {podGroup1}, }, wantSimilarNodeGroups: []string{"ng2", "ng3"}, }, @@ -1272,12 +1273,12 @@ func TestComputeSimilarNodeGroups(t *testing.T) { similarNodeGroups: []string{"ng2", "ng3"}, otherNodeGroups: []string{"pg1", "pg2"}, balancingEnabled: true, - schedulablePods: map[string][]*apiv1.Pod{ - "ng1": {pod1}, - "ng2": {pod1, pod2}, - "ng3": {pod1, pod2, pod3}, - "pg1": {pod1, pod2}, - "pg2": {pod1, pod2, pod3}, + schedulablePodGroups: map[string][]estimator.PodEquivalenceGroup{ + "ng1": {podGroup1}, + "ng2": {podGroup1, podGroup2}, + "ng3": {podGroup1, podGroup2, podGroup3}, + "pg1": {podGroup1, podGroup2}, + "pg2": {podGroup1, podGroup2, podGroup3}, }, wantSimilarNodeGroups: []string{"ng2", "ng3"}, }, @@ -1287,10 +1288,10 @@ func TestComputeSimilarNodeGroups(t *testing.T) { similarNodeGroups: []string{"ng2", "ng3"}, otherNodeGroups: []string{"pg1", "pg2"}, balancingEnabled: true, - schedulablePods: map[string][]*apiv1.Pod{ - "ng1": {pod1, pod2}, - "ng2": {pod1}, - "pg1": {pod1}, + schedulablePodGroups: map[string][]estimator.PodEquivalenceGroup{ + "ng1": {podGroup1, podGroup2}, + "ng2": {podGroup1}, + "pg1": {podGroup1}, }, wantSimilarNodeGroups: []string{}, }, @@ -1331,7 +1332,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) { suOrchestrator := &ScaleUpOrchestrator{} suOrchestrator.Initialize(&ctx, &processors.AutoscalingProcessors{NodeGroupSetProcessor: nodeGroupSetProcessor}, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - similarNodeGroups := suOrchestrator.ComputeSimilarNodeGroups(provider.GetNodeGroup(tc.nodeGroup), nodeInfos, tc.schedulablePods, now) + similarNodeGroups := suOrchestrator.ComputeSimilarNodeGroups(provider.GetNodeGroup(tc.nodeGroup), nodeInfos, tc.schedulablePodGroups, now) var gotSimilarNodeGroups []string for _, ng := range similarNodeGroups { diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index 86a26de22550..229a2e26420f 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -36,6 +36,16 @@ type BinpackingNodeEstimator struct { podOrderer EstimationPodOrderer context EstimationContext estimationAnalyserFunc EstimationAnalyserFunc // optional + +} + +// estimationState contains helper variables to avoid coping them independently in each function. +type estimationState struct { + scheduledPods []*apiv1.Pod + newNodeNameIndex int + lastNodeName string + newNodeNames map[string]bool + newNodesWithPods map[string]bool } // NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator. @@ -57,6 +67,16 @@ func NewBinpackingNodeEstimator( } } +func newEstimationState() *estimationState { + return &estimationState{ + scheduledPods: []*apiv1.Pod{}, + newNodeNameIndex: 0, + lastNodeName: "", + newNodeNames: map[string]bool{}, + newNodesWithPods: map[string]bool{}, + } +} + // Estimate implements First-Fit bin-packing approximation algorithm // The ordering of the pods depend on the EstimatePodOrderer, the default // order is DecreasingPodOrderer @@ -68,49 +88,92 @@ func NewBinpackingNodeEstimator( // It is assumed that all pods from the given list can fit to nodeTemplate. // Returns the number of nodes needed to accommodate all pods from the list. func (e *BinpackingNodeEstimator) Estimate( - pods []*apiv1.Pod, + podsEquivalenceGroups []PodEquivalenceGroup, nodeTemplate *schedulerframework.NodeInfo, - nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) { + nodeGroup cloudprovider.NodeGroup, +) (int, []*apiv1.Pod) { - e.limiter.StartEstimation(pods, nodeGroup, e.context) + e.limiter.StartEstimation(podsEquivalenceGroups, nodeGroup, e.context) defer e.limiter.EndEstimation() - pods = e.podOrderer.Order(pods, nodeTemplate, nodeGroup) - - newNodeNames := make(map[string]bool) - newNodesWithPods := make(map[string]bool) + podsEquivalenceGroups = e.podOrderer.Order(podsEquivalenceGroups, nodeTemplate, nodeGroup) e.clusterSnapshot.Fork() defer func() { e.clusterSnapshot.Revert() }() - newNodeNameIndex := 0 - scheduledPods := []*apiv1.Pod{} - lastNodeName := "" + estimationState := newEstimationState() + for _, podsEquivalenceGroup := range podsEquivalenceGroups { + var err error + var remainingPods []*apiv1.Pod - for _, pod := range pods { - found := false + remainingPods, err = e.tryToScheduleOnExistingNodes(estimationState, podsEquivalenceGroup.Pods) + if err != nil { + klog.Errorf(err.Error()) + return 0, nil + } + err = e.tryToScheduleOnNewNodes(estimationState, nodeTemplate, remainingPods) + if err != nil { + klog.Errorf(err.Error()) + return 0, nil + } + } + + if e.estimationAnalyserFunc != nil { + e.estimationAnalyserFunc(e.clusterSnapshot, nodeGroup, estimationState.newNodesWithPods) + } + return len(estimationState.newNodesWithPods), estimationState.scheduledPods +} + +func (e *BinpackingNodeEstimator) tryToScheduleOnExistingNodes( + estimationState *estimationState, + pods []*apiv1.Pod, +) ([]*apiv1.Pod, error) { + var index int + for index = 0; index < len(pods); index++ { + pod := pods[index] + + // Check schedulability on all nodes created during simulation nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool { - return newNodeNames[nodeInfo.Node().Name] + return estimationState.newNodeNames[nodeInfo.Node().Name] }) - if err == nil { - found = true - if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil { - klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err) - return 0, nil + if err != nil { + break + } + + if err := e.tryToAddNode(estimationState, pod, nodeName); err != nil { + return nil, err + } + } + return pods[index:], nil +} + +func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes( + estimationState *estimationState, + nodeTemplate *schedulerframework.NodeInfo, + pods []*apiv1.Pod, +) error { + for _, pod := range pods { + found := false + + if estimationState.lastNodeName != "" { + // Check schedulability on only newly created node + if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, estimationState.lastNodeName); err == nil { + found = true + if err := e.tryToAddNode(estimationState, pod, estimationState.lastNodeName); err != nil { + return err + } } - scheduledPods = append(scheduledPods, pod) - newNodesWithPods[nodeName] = true } if !found { // If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule // on a new node either. There is no point adding more nodes to snapshot in such case, especially because of // performance cost each extra node adds to future FitsAnyNodeMatching calls. - if lastNodeName != "" && !newNodesWithPods[lastNodeName] { - continue + if estimationState.lastNodeName != "" && !estimationState.newNodesWithPods[estimationState.lastNodeName] { + break } // Stop binpacking if we reach the limit of nodes we can add. @@ -124,49 +187,52 @@ func (e *BinpackingNodeEstimator) Estimate( } // Add new node - newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex) - if err != nil { - klog.Errorf("Error while adding new node for template to ClusterSnapshot; %v", err) - return 0, nil + if err := e.addNewNodeToSnapshot(estimationState, nodeTemplate); err != nil { + return fmt.Errorf("Error while adding new node for template to ClusterSnapshot; %w", err) } - newNodeNameIndex++ - newNodeNames[newNodeName] = true - lastNodeName = newNodeName // And try to schedule pod to it. // Note that this may still fail (ex. if topology spreading with zonal topologyKey is used); // in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid // adding and removing node to snapshot for each such pod. - if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, newNodeName); err != nil { - continue + if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, estimationState.lastNodeName); err != nil { + break } - if err := e.clusterSnapshot.AddPod(pod, newNodeName); err != nil { - klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, newNodeName, err) - return 0, nil + if err := e.tryToAddNode(estimationState, pod, estimationState.lastNodeName); err != nil { + return err } - newNodesWithPods[newNodeName] = true - scheduledPods = append(scheduledPods, pod) } } - - if e.estimationAnalyserFunc != nil { - e.estimationAnalyserFunc(e.clusterSnapshot, nodeGroup, newNodesWithPods) - } - - return len(newNodesWithPods), scheduledPods + return nil } func (e *BinpackingNodeEstimator) addNewNodeToSnapshot( + estimationState *estimationState, template *schedulerframework.NodeInfo, - nameIndex int) (string, error) { - - newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", nameIndex)) +) error { + newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex)) var pods []*apiv1.Pod for _, podInfo := range newNodeInfo.Pods { pods = append(pods, podInfo.Pod) } if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { - return "", err + return err + } + estimationState.newNodeNameIndex++ + estimationState.lastNodeName = newNodeInfo.Node().Name + estimationState.newNodeNames[estimationState.lastNodeName] = true + return nil +} + +func (e *BinpackingNodeEstimator) tryToAddNode( + estimationState *estimationState, + pod *apiv1.Pod, + nodeName string, +) error { + if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil { + return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err) } - return newNodeInfo.Node().Name, nil + estimationState.newNodesWithPods[nodeName] = true + estimationState.scheduledPods = append(estimationState.scheduledPods, pod) + return nil } diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index 5dc06d06efba..9937266601b2 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -32,57 +32,17 @@ import ( "github.com/stretchr/testify/assert" ) -func makePods(cpuPerPod int64, memoryPerPod int64, hostport int32, maxSkew int32, topologySpreadingKey string, podCount int) []*apiv1.Pod { - pod := &apiv1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "estimatee", - Namespace: "universe", - Labels: map[string]string{ - "app": "estimatee", - }, - }, - Spec: apiv1.PodSpec{ - Containers: []apiv1.Container{ - { - Resources: apiv1.ResourceRequirements{ - Requests: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod*units.MiB, resource.DecimalSI), - }, - }, - }, - }, - }, - } - if hostport > 0 { - pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ - { - HostPort: hostport, - }, - } - } - if maxSkew > 0 { - pod.Spec.TopologySpreadConstraints = []apiv1.TopologySpreadConstraint{ - { - MaxSkew: maxSkew, - TopologyKey: topologySpreadingKey, - WhenUnsatisfiable: "DoNotSchedule", - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "estimatee", - }, - }, - }, - } - } +func makePodEquivalenceGroup(pod *apiv1.Pod, podCount int) PodEquivalenceGroup { pods := []*apiv1.Pod{} for i := 0; i < podCount; i++ { pods = append(pods, pod) } - return pods + return PodEquivalenceGroup{ + Pods: pods, + } } -func makeNode(cpu int64, mem int64, name string, zone string) *apiv1.Node { +func makeNode(cpu, mem, podCount int64, name string, zone string) *apiv1.Node { node := &apiv1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -95,7 +55,7 @@ func makeNode(cpu int64, mem int64, name string, zone string) *apiv1.Node { Capacity: apiv1.ResourceList{ apiv1.ResourceCPU: *resource.NewMilliQuantity(cpu, resource.DecimalSI), apiv1.ResourceMemory: *resource.NewQuantity(mem*units.MiB, resource.DecimalSI), - apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + apiv1.ResourcePods: *resource.NewQuantity(podCount, resource.DecimalSI), }, }, } @@ -105,74 +65,144 @@ func makeNode(cpu int64, mem int64, name string, zone string) *apiv1.Node { } func TestBinpackingEstimate(t *testing.T) { - highResourcePodList := makePods(500, 1000, 0, 0, "", 10) + highResourcePodGroup := makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 500, + 1000, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + }), + ), + 10, + ) testCases := []struct { name string millicores int64 memory int64 maxNodes int - pods []*apiv1.Pod + podsEquivalenceGroup []PodEquivalenceGroup topologySpreadingKey string expectNodeCount int expectPodCount int expectProcessedPods []*apiv1.Pod }{ { - name: "simple resource-based binpacking", - millicores: 350*3 - 50, - memory: 2 * 1000, - pods: makePods(350, 1000, 0, 0, "", 10), + name: "simple resource-based binpacking", + millicores: 350*3 - 50, + memory: 2 * 1000, + podsEquivalenceGroup: []PodEquivalenceGroup{makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 350, + 1000, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + })), 10)}, expectNodeCount: 5, expectPodCount: 10, }, { - name: "pods-per-node bound binpacking", - millicores: 10000, - memory: 20000, - pods: makePods(10, 100, 0, 0, "", 20), + name: "pods-per-node bound binpacking", + millicores: 10000, + memory: 20000, + podsEquivalenceGroup: []PodEquivalenceGroup{makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 10, + 100, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + })), 20)}, expectNodeCount: 2, expectPodCount: 20, }, { - name: "hostport conflict forces pod-per-node", - millicores: 1000, - memory: 5000, - pods: makePods(200, 1000, 5555, 0, "", 8), + name: "hostport conflict forces pod-per-node", + millicores: 1000, + memory: 5000, + podsEquivalenceGroup: []PodEquivalenceGroup{makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 200, + 1000, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + }), + WithHostPort(5555)), 8)}, expectNodeCount: 8, expectPodCount: 8, }, { - name: "limiter cuts binpacking", - millicores: 1000, - memory: 5000, - pods: makePods(500, 1000, 0, 0, "", 20), + name: "limiter cuts binpacking", + millicores: 1000, + memory: 5000, + podsEquivalenceGroup: []PodEquivalenceGroup{makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 500, + 1000, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + })), 20)}, maxNodes: 5, expectNodeCount: 5, expectPodCount: 10, }, { - name: "decreasing ordered pods are processed first", - millicores: 1000, - memory: 5000, - pods: append(makePods(50, 1000, 0, 0, "", 10), highResourcePodList...), + name: "decreasing ordered pods are processed first", + millicores: 1000, + memory: 5000, + podsEquivalenceGroup: append([]PodEquivalenceGroup{makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 50, + 1000, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + })), 10)}, highResourcePodGroup), maxNodes: 5, expectNodeCount: 5, expectPodCount: 10, - expectProcessedPods: highResourcePodList, + expectProcessedPods: highResourcePodGroup.Pods, }, { - name: "hostname topology spreading with maxSkew=2 forces 2 pods/node", - millicores: 1000, - memory: 5000, - pods: makePods(20, 100, 0, 2, "kubernetes.io/hostname", 8), + name: "hostname topology spreading with maxSkew=2 forces 2 pods/node", + millicores: 1000, + memory: 5000, + podsEquivalenceGroup: []PodEquivalenceGroup{makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 20, + 100, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + }), + WithMaxSkew(2, "kubernetes.io/hostname")), 8)}, expectNodeCount: 4, expectPodCount: 8, }, { - name: "zonal topology spreading with maxSkew=2 only allows 2 pods to schedule", - millicores: 1000, - memory: 5000, - pods: makePods(20, 100, 0, 2, "topology.kubernetes.io/zone", 8), + name: "zonal topology spreading with maxSkew=2 only allows 2 pods to schedule", + millicores: 1000, + memory: 5000, + podsEquivalenceGroup: []PodEquivalenceGroup{makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 20, + 100, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + }), + WithMaxSkew(2, "topology.kubernetes.io/zone")), 8)}, expectNodeCount: 1, expectPodCount: 2, }, @@ -181,18 +211,18 @@ func TestBinpackingEstimate(t *testing.T) { t.Run(tc.name, func(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() // Add one node in different zone to trigger topology spread constraints - clusterSnapshot.AddNode(makeNode(100, 100, "oldnode", "zone-jupiter")) + clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) limiter := NewThresholdBasedEstimationLimiter([]Threshold{NewStaticThreshold(tc.maxNodes, time.Duration(0))}) processor := NewDecreasingPodOrderer() estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor, nil /* EstimationContext */, nil /* EstimationAnalyserFunc */) - node := makeNode(tc.millicores, tc.memory, "template", "zone-mars") + node := makeNode(tc.millicores, tc.memory, 10, "template", "zone-mars") nodeInfo := schedulerframework.NewNodeInfo() nodeInfo.SetNode(node) - estimatedNodes, estimatedPods := estimator.Estimate(tc.pods, nodeInfo, nil) + estimatedNodes, estimatedPods := estimator.Estimate(tc.podsEquivalenceGroup, nodeInfo, nil) assert.Equal(t, tc.expectNodeCount, estimatedNodes) assert.Equal(t, tc.expectPodCount, len(estimatedPods)) if tc.expectProcessedPods != nil { @@ -201,3 +231,54 @@ func TestBinpackingEstimate(t *testing.T) { }) } } + +func BenchmarkBinpackingEstimate(b *testing.B) { + millicores := int64(1000) + memory := int64(5000) + podsPerNode := int64(100) + maxNodes := 3000 + expectNodeCount := 2595 + expectPodCount := 51000 + podsEquivalenceGroup := []PodEquivalenceGroup{ + makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 50, + 100, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + })), + 50000, + ), + makePodEquivalenceGroup( + BuildTestPod( + "estimatee", + 95, + 190, + WithNamespace("universe"), + WithLabels(map[string]string{ + "app": "estimatee", + })), + 1000, + ), + } + + for i := 0; i < b.N; i++ { + clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() + clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + + predicateChecker, err := predicatechecker.NewTestPredicateChecker() + assert.NoError(b, err) + limiter := NewThresholdBasedEstimationLimiter([]Threshold{NewStaticThreshold(maxNodes, time.Duration(0))}) + processor := NewDecreasingPodOrderer() + estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor, nil /* EstimationContext */, nil /* EstimationAnalyserFunc */) + node := makeNode(millicores, memory, podsPerNode, "template", "zone-mars") + nodeInfo := schedulerframework.NewNodeInfo() + nodeInfo.SetNode(node) + + estimatedNodes, estimatedPods := estimator.Estimate(podsEquivalenceGroup, nodeInfo, nil) + assert.Equal(b, expectNodeCount, estimatedNodes) + assert.Equal(b, expectPodCount, len(estimatedPods)) + } +} diff --git a/cluster-autoscaler/estimator/decreasing_pod_orderer.go b/cluster-autoscaler/estimator/decreasing_pod_orderer.go index ac8b930ddd31..8a2be95ff3c8 100644 --- a/cluster-autoscaler/estimator/decreasing_pod_orderer.go +++ b/cluster-autoscaler/estimator/decreasing_pod_orderer.go @@ -27,8 +27,8 @@ import ( // podScoreInfo contains Pod and score that corresponds to how important it is to handle the pod first. type podScoreInfo struct { - score float64 - pod *apiv1.Pod + score float64 + podsEquivalentGroup PodEquivalenceGroup } // DecreasingPodOrderer is the default implementation of the EstimationPodOrderer @@ -42,29 +42,36 @@ func NewDecreasingPodOrderer() *DecreasingPodOrderer { } // Order is the processing func that sorts the pods based on the size of the pod -func (d *DecreasingPodOrderer) Order(pods []*apiv1.Pod, nodeTemplate *framework.NodeInfo, _ cloudprovider.NodeGroup) []*apiv1.Pod { - podInfos := make([]*podScoreInfo, 0, len(pods)) - for _, pod := range pods { - podInfos = append(podInfos, d.calculatePodScore(pod, nodeTemplate)) +func (d *DecreasingPodOrderer) Order(podsEquivalentGroups []PodEquivalenceGroup, nodeTemplate *framework.NodeInfo, _ cloudprovider.NodeGroup) []PodEquivalenceGroup { + podInfos := make([]*podScoreInfo, 0, len(podsEquivalentGroups)) + for _, podsEquivalentGroup := range podsEquivalentGroups { + podInfos = append(podInfos, d.calculatePodScore(podsEquivalentGroup, nodeTemplate)) } sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score }) - podList := make([]*apiv1.Pod, 0, len(pods)) + sorted := make([]PodEquivalenceGroup, 0, len(podsEquivalentGroups)) for _, podInfo := range podInfos { - podList = append(podList, podInfo.pod) + sorted = append(sorted, podInfo.podsEquivalentGroup) } - return podList + return sorted } // calculatePodScore score for pod and returns podScoreInfo structure. // Score is defined as cpu_sum/node_capacity + mem_sum/node_capacity. // Pods that have bigger requirements should be processed first, thus have higher scores. -func (d *DecreasingPodOrderer) calculatePodScore(pod *apiv1.Pod, nodeTemplate *framework.NodeInfo) *podScoreInfo { +func (d *DecreasingPodOrderer) calculatePodScore(podsEquivalentGroup PodEquivalenceGroup, nodeTemplate *framework.NodeInfo) *podScoreInfo { + samplePod := podsEquivalentGroup.Exemplar() + if samplePod == nil { + return &podScoreInfo{ + score: 0, + podsEquivalentGroup: podsEquivalentGroup, + } + } cpuSum := resource.Quantity{} memorySum := resource.Quantity{} - for _, container := range pod.Spec.Containers { + for _, container := range samplePod.Spec.Containers { if request, ok := container.Resources.Requests[apiv1.ResourceCPU]; ok { cpuSum.Add(request) } @@ -81,7 +88,7 @@ func (d *DecreasingPodOrderer) calculatePodScore(pod *apiv1.Pod, nodeTemplate *f } return &podScoreInfo{ - score: score, - pod: pod, + score: score, + podsEquivalentGroup: podsEquivalentGroup, } } diff --git a/cluster-autoscaler/estimator/decreasing_pod_orderer_test.go b/cluster-autoscaler/estimator/decreasing_pod_orderer_test.go index 07c2e0349075..4720805eee6a 100644 --- a/cluster-autoscaler/estimator/decreasing_pod_orderer_test.go +++ b/cluster-autoscaler/estimator/decreasing_pod_orderer_test.go @@ -20,35 +20,35 @@ import ( "testing" "github.com/stretchr/testify/assert" - apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/utils/test" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) func TestPodPriorityProcessor(t *testing.T) { - p1 := test.BuildTestPod("p1", 1, 1) - p2 := test.BuildTestPod("p2", 2, 1) - p3 := test.BuildTestPod("p3", 2, 100) - node := makeNode(4, 600, "node1", "zone-sun") + pg1 := PodEquivalenceGroup{Pods: []*v1.Pod{test.BuildTestPod("p1", 1, 1)}} + pg2 := PodEquivalenceGroup{Pods: []*v1.Pod{test.BuildTestPod("p2", 2, 1)}} + pg3 := PodEquivalenceGroup{Pods: []*v1.Pod{test.BuildTestPod("p3", 2, 100)}} + node := makeNode(4, 600, 10, "node1", "zone-sun") testCases := map[string]struct { - inputPods []*apiv1.Pod - expectedPods []*apiv1.Pod + inputPodsEquivalentGroup []PodEquivalenceGroup + expectedPodsEquivalentGroup []PodEquivalenceGroup }{ "single pod": { - inputPods: []*apiv1.Pod{p1}, - expectedPods: []*apiv1.Pod{p1}, + inputPodsEquivalentGroup: []PodEquivalenceGroup{pg1}, + expectedPodsEquivalentGroup: []PodEquivalenceGroup{pg1}, }, "sorted list of pods": { - inputPods: []*apiv1.Pod{p3, p2, p1}, - expectedPods: []*apiv1.Pod{p3, p2, p1}, + inputPodsEquivalentGroup: []PodEquivalenceGroup{pg3, pg2, pg1}, + expectedPodsEquivalentGroup: []PodEquivalenceGroup{pg3, pg2, pg1}, }, "randomised list of pods": { - inputPods: []*apiv1.Pod{p1, p3, p2}, - expectedPods: []*apiv1.Pod{p3, p2, p1}, + inputPodsEquivalentGroup: []PodEquivalenceGroup{pg1, pg3, pg2}, + expectedPodsEquivalentGroup: []PodEquivalenceGroup{pg3, pg2, pg1}, }, "empty pod list": { - inputPods: []*apiv1.Pod{}, - expectedPods: []*apiv1.Pod{}, + inputPodsEquivalentGroup: []PodEquivalenceGroup{}, + expectedPodsEquivalentGroup: []PodEquivalenceGroup{}, }, } @@ -59,8 +59,8 @@ func TestPodPriorityProcessor(t *testing.T) { processor := NewDecreasingPodOrderer() nodeInfo := schedulerframework.NewNodeInfo() nodeInfo.SetNode(node) - actual := processor.Order(tc.inputPods, nodeInfo, nil) - assert.Equal(t, tc.expectedPods, actual) + actual := processor.Order(tc.inputPodsEquivalentGroup, nodeInfo, nil) + assert.Equal(t, tc.expectedPodsEquivalentGroup, actual) }) } } diff --git a/cluster-autoscaler/estimator/estimator.go b/cluster-autoscaler/estimator/estimator.go index b4d8e179d54d..7323a168e324 100644 --- a/cluster-autoscaler/estimator/estimator.go +++ b/cluster-autoscaler/estimator/estimator.go @@ -35,11 +35,26 @@ const ( // AvailableEstimators is a list of available estimators. var AvailableEstimators = []string{BinpackingEstimatorName} +// PodEquivalenceGroup represents a group of pods, which have the same scheduling +// requirements and are managed by the same controller. +type PodEquivalenceGroup struct { + Pods []*apiv1.Pod +} + +// Exemplar returns an example pod from the group. +func (p *PodEquivalenceGroup) Exemplar() *apiv1.Pod { + if len(p.Pods) == 0 { + return nil + } + return p.Pods[0] +} + // Estimator calculates the number of nodes of given type needed to schedule pods. // It returns the number of new nodes needed as well as the list of pods it managed // to schedule on those nodes. type Estimator interface { - Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod) + // Estimate estimates how many nodes are needed to provision pods coming from the given equivalence groups. + Estimate([]PodEquivalenceGroup, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod) } // EstimatorBuilder creates a new estimator object. @@ -67,7 +82,7 @@ func NewEstimatorBuilder(name string, limiter EstimationLimiter, orderer Estimat // scale-up is limited by external factors. type EstimationLimiter interface { // StartEstimation is called at the start of estimation. - StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup, EstimationContext) + StartEstimation([]PodEquivalenceGroup, cloudprovider.NodeGroup, EstimationContext) // EndEstimation is called at the end of estimation. EndEstimation() // PermissionToAddNode is called by an estimator when it wants to add additional @@ -81,5 +96,5 @@ type EstimationLimiter interface { // EstimationPodOrderer is an interface used to determine the order of the pods // used while binpacking during scale up estimation type EstimationPodOrderer interface { - Order(pods []*apiv1.Pod, nodeTemplate *framework.NodeInfo, nodeGroup cloudprovider.NodeGroup) []*apiv1.Pod + Order(podsEquivalentGroups []PodEquivalenceGroup, nodeTemplate *framework.NodeInfo, nodeGroup cloudprovider.NodeGroup) []PodEquivalenceGroup } diff --git a/cluster-autoscaler/estimator/threshold_based_limiter.go b/cluster-autoscaler/estimator/threshold_based_limiter.go index 57b4e51d5d28..a59f448ac51a 100644 --- a/cluster-autoscaler/estimator/threshold_based_limiter.go +++ b/cluster-autoscaler/estimator/threshold_based_limiter.go @@ -19,7 +19,6 @@ package estimator import ( "time" - apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" klog "k8s.io/klog/v2" ) @@ -32,7 +31,7 @@ type thresholdBasedEstimationLimiter struct { thresholds []Threshold } -func (tbel *thresholdBasedEstimationLimiter) StartEstimation(_ []*apiv1.Pod, nodeGroup cloudprovider.NodeGroup, context EstimationContext) { +func (tbel *thresholdBasedEstimationLimiter) StartEstimation(_ []PodEquivalenceGroup, nodeGroup cloudprovider.NodeGroup, context EstimationContext) { tbel.start = time.Now() tbel.nodes = 0 tbel.maxNodes = 0 diff --git a/cluster-autoscaler/estimator/threshold_based_limiter_test.go b/cluster-autoscaler/estimator/threshold_based_limiter_test.go index ebcfc0c1ddf6..cf4393fc6ca2 100644 --- a/cluster-autoscaler/estimator/threshold_based_limiter_test.go +++ b/cluster-autoscaler/estimator/threshold_based_limiter_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/stretchr/testify/assert" - apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" ) @@ -37,7 +36,7 @@ func expectAllow(t *testing.T, l EstimationLimiter) { func resetLimiter(_ *testing.T, l EstimationLimiter) { l.EndEstimation() - l.StartEstimation([]*apiv1.Pod{}, nil, nil) + l.StartEstimation([]PodEquivalenceGroup{}, nil, nil) } type dynamicThreshold struct { @@ -173,7 +172,7 @@ func TestThresholdBasedLimiter(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { limiter := NewThresholdBasedEstimationLimiter(tc.thresholds).(*thresholdBasedEstimationLimiter) - limiter.StartEstimation([]*apiv1.Pod{}, nil, nil) + limiter.StartEstimation([]PodEquivalenceGroup{}, nil, nil) if tc.startDelta != time.Duration(0) { limiter.start = limiter.start.Add(tc.startDelta) diff --git a/cluster-autoscaler/utils/test/test_utils.go b/cluster-autoscaler/utils/test/test_utils.go index 8deb51a6a5b2..81c939cbfc43 100644 --- a/cluster-autoscaler/utils/test/test_utils.go +++ b/cluster-autoscaler/utils/test/test_utils.go @@ -103,6 +103,53 @@ func WithNodeName(nodeName string) func(*apiv1.Pod) { } } +// WithNamespace sets a namespace to the pod. +func WithNamespace(namespace string) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.ObjectMeta.Namespace = namespace + } +} + +// WithLabels sets a Labels to the pod. +func WithLabels(labels map[string]string) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.ObjectMeta.Labels = labels + } +} + +// WithHostPort sets a namespace to the pod. +func WithHostPort(hostport int32) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + if hostport > 0 { + pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ + { + HostPort: hostport, + }, + } + } + } +} + +// WithMaxSkew sets a namespace to the pod. +func WithMaxSkew(maxSkew int32, topologySpreadingKey string) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + if maxSkew > 0 { + pod.Spec.TopologySpreadConstraints = []apiv1.TopologySpreadConstraint{ + { + MaxSkew: maxSkew, + TopologyKey: topologySpreadingKey, + WhenUnsatisfiable: "DoNotSchedule", + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "estimatee", + }, + }, + }, + } + } + } +} + // BuildTestPodWithEphemeralStorage creates a pod with cpu, memory and ephemeral storage resources. func BuildTestPodWithEphemeralStorage(name string, cpu, mem, ephemeralStorage int64) *apiv1.Pod { startTime := metav1.Unix(0, 0)