Skip to content

Commit

Permalink
Introduce binbacking optimization for similar pods.
Browse files Browse the repository at this point in the history
The optimization uses the fact that pods which are equivalent do not
need to be check multiple times against already filled nodes.
This changes the time complexity from O(pods*nodes) to O(pods).
  • Loading branch information
kisieland committed Apr 4, 2024
1 parent 5d0c973 commit 5aa6b2c
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 163 deletions.
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,10 +696,10 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool {
schedulableSamplePods := make(map[*apiv1.Pod]bool)
for _, podGroup := range similarPodGroups {
schedulableSamplePods[podGroup.Pods[0]] = true
schedulableSamplePods[podGroup.Exemplar()] = true
}
for _, podGroup := range podGroups {
if _, found := schedulableSamplePods[podGroup.Pods[0]]; !found {
if _, found := schedulableSamplePods[podGroup.Exemplar()]; !found {
return false
}
}
Expand Down
202 changes: 133 additions & 69 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ type BinpackingNodeEstimator struct {
podOrderer EstimationPodOrderer
context EstimationContext
estimationAnalyserFunc EstimationAnalyserFunc // optional

}

// estimationState contains helper variables to avoid coping them independently in each function.
type estimationState struct {
scheduledPods []*apiv1.Pod
newNodeNameIndex int
lastNodeName string
newNodeNames map[string]bool
newNodesWithPods map[string]bool
}

// NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator.
Expand All @@ -57,6 +67,16 @@ func NewBinpackingNodeEstimator(
}
}

func newEstimationState() *estimationState {
return &estimationState{
scheduledPods: []*apiv1.Pod{},
newNodeNameIndex: 0,
lastNodeName: "",
newNodeNames: map[string]bool{},
newNodesWithPods: map[string]bool{},
}
}

// Estimate implements First-Fit bin-packing approximation algorithm
// The ordering of the pods depend on the EstimatePodOrderer, the default
// order is DecreasingPodOrderer
Expand All @@ -70,105 +90,149 @@ func NewBinpackingNodeEstimator(
func (e *BinpackingNodeEstimator) Estimate(
podsEquivalenceGroups []PodEquivalenceGroup,
nodeTemplate *schedulerframework.NodeInfo,
nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) {
nodeGroup cloudprovider.NodeGroup,
) (int, []*apiv1.Pod) {

e.limiter.StartEstimation(podsEquivalenceGroups, nodeGroup, e.context)
defer e.limiter.EndEstimation()

podsEquivalenceGroups = e.podOrderer.Order(podsEquivalenceGroups, nodeTemplate, nodeGroup)

newNodeNames := make(map[string]bool)
newNodesWithPods := make(map[string]bool)

e.clusterSnapshot.Fork()
defer func() {
e.clusterSnapshot.Revert()
}()

newNodeNameIndex := 0
scheduledPods := []*apiv1.Pod{}
lastNodeName := ""

estimationState := newEstimationState()
for _, podsEquivalenceGroup := range podsEquivalenceGroups {
for _, pod := range podsEquivalenceGroup.Pods {
found := false
var err error
var remainingPods []*apiv1.Pod

remainingPods, err = e.tryToScheduleOnExistingNodes(estimationState, podsEquivalenceGroup.Pods)
if err != nil {
klog.Errorf(err.Error())
return 0, nil
}

err = e.tryToScheduleOnNewNodes(estimationState, nodeTemplate, remainingPods)
if err != nil {
klog.Errorf(err.Error())
return 0, nil
}
}

if e.estimationAnalyserFunc != nil {
e.estimationAnalyserFunc(e.clusterSnapshot, nodeGroup, estimationState.newNodesWithPods)
}
return len(estimationState.newNodesWithPods), estimationState.scheduledPods
}

nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
return newNodeNames[nodeInfo.Node().Name]
})
if err == nil {
func (e *BinpackingNodeEstimator) tryToScheduleOnExistingNodes(
estimationState *estimationState,
pods []*apiv1.Pod,
) ([]*apiv1.Pod, error) {
var index int
for index = 0; index < len(pods); index++ {
pod := pods[index]

// Check schedulability on all nodes created during simulation
nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
return estimationState.newNodeNames[nodeInfo.Node().Name]
})
if err != nil {
break
}

if err := e.tryToAddNode(estimationState, pod, nodeName); err != nil {
return nil, err
}
}
return pods[index:], nil
}

func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes(
estimationState *estimationState,
nodeTemplate *schedulerframework.NodeInfo,
pods []*apiv1.Pod,
) error {
for _, pod := range pods {
found := false

if estimationState.lastNodeName != "" {
// Check schedulability on only newly created node
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, estimationState.lastNodeName); err == nil {
found = true
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
return 0, nil
if err := e.tryToAddNode(estimationState, pod, estimationState.lastNodeName); err != nil {
return err
}
scheduledPods = append(scheduledPods, pod)
newNodesWithPods[nodeName] = true
}
}

if !found {
// If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule
// on a new node either. There is no point adding more nodes to snapshot in such case, especially because of
// performance cost each extra node adds to future FitsAnyNodeMatching calls.
if lastNodeName != "" && !newNodesWithPods[lastNodeName] {
continue
}
if !found {
// If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule
// on a new node either. There is no point adding more nodes to snapshot in such case, especially because of
// performance cost each extra node adds to future FitsAnyNodeMatching calls.
if estimationState.lastNodeName != "" && !estimationState.newNodesWithPods[estimationState.lastNodeName] {
break
}

// Stop binpacking if we reach the limit of nodes we can add.
// We return the result of the binpacking that we already performed.
//
// The thresholdBasedEstimationLimiter implementation assumes that for
// each call that returns true, one node gets added. Therefore this
// must be the last check right before really adding a node.
if !e.limiter.PermissionToAddNode() {
break
}
// Stop binpacking if we reach the limit of nodes we can add.
// We return the result of the binpacking that we already performed.
//
// The thresholdBasedEstimationLimiter implementation assumes that for
// each call that returns true, one node gets added. Therefore this
// must be the last check right before really adding a node.
if !e.limiter.PermissionToAddNode() {
break
}

// Add new node
newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
if err != nil {
klog.Errorf("Error while adding new node for template to ClusterSnapshot; %v", err)
return 0, nil
}
newNodeNameIndex++
newNodeNames[newNodeName] = true
lastNodeName = newNodeName

// And try to schedule pod to it.
// Note that this may still fail (ex. if topology spreading with zonal topologyKey is used);
// in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid
// adding and removing node to snapshot for each such pod.
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, newNodeName); err != nil {
continue
}
if err := e.clusterSnapshot.AddPod(pod, newNodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, newNodeName, err)
return 0, nil
}
newNodesWithPods[newNodeName] = true
scheduledPods = append(scheduledPods, pod)
// Add new node
if err := e.addNewNodeToSnapshot(estimationState, nodeTemplate); err != nil {
return fmt.Errorf("Error while adding new node for template to ClusterSnapshot; %w", err)
}
}
}

if e.estimationAnalyserFunc != nil {
e.estimationAnalyserFunc(e.clusterSnapshot, nodeGroup, newNodesWithPods)
// And try to schedule pod to it.
// Note that this may still fail (ex. if topology spreading with zonal topologyKey is used);
// in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid
// adding and removing node to snapshot for each such pod.
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, estimationState.lastNodeName); err != nil {
break
}
if err := e.tryToAddNode(estimationState, pod, estimationState.lastNodeName); err != nil {
return err
}
}
}

return len(newNodesWithPods), scheduledPods
return nil
}

func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
estimationState *estimationState,
template *schedulerframework.NodeInfo,
nameIndex int) (string, error) {

newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", nameIndex))
) error {
newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex))
var pods []*apiv1.Pod
for _, podInfo := range newNodeInfo.Pods {
pods = append(pods, podInfo.Pod)
}
if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
return "", err
return err
}
estimationState.newNodeNameIndex++
estimationState.lastNodeName = newNodeInfo.Node().Name
estimationState.newNodeNames[estimationState.lastNodeName] = true
return nil
}

func (e *BinpackingNodeEstimator) tryToAddNode(
estimationState *estimationState,
pod *apiv1.Pod,
nodeName string,
) error {
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
}
return newNodeInfo.Node().Name, nil
estimationState.newNodesWithPods[nodeName] = true
estimationState.scheduledPods = append(estimationState.scheduledPods, pod)
return nil
}
Loading

0 comments on commit 5aa6b2c

Please sign in to comment.