Skip to content

Commit

Permalink
feat: change binder to use public functions from pkg/plugins and to t…
Browse files Browse the repository at this point in the history
…ake NMNodes into account when checking.
  • Loading branch information
slipegg committed Sep 25, 2024
1 parent 241cc98 commit d7fc2ed
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 94 deletions.
45 changes: 23 additions & 22 deletions pkg/binder/framework/plugins/interpodaffinity/interpodaffinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/kubewharf/godel-scheduler/pkg/binder/framework/handle"
framework "github.com/kubewharf/godel-scheduler/pkg/framework/api"
utils "github.com/kubewharf/godel-scheduler/pkg/plugins/interpodaffinity"
interpodScheduler "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/plugins/interpodaffinity"
podutil "github.com/kubewharf/godel-scheduler/pkg/util/pod"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -52,15 +53,15 @@ func (pl *InterPodAffinity) CheckConflicts(_ context.Context, cycleState *framew
return framework.NewStatus(framework.Error, err.Error())
}
topologyLabels := nodeInfo.GetNodeLabels(podLauncher)
matchedNodeInfos, err := pl.getNodesWithSameTopologyLabels(topologyLabels)
matchedNodeInfos, err := pl.getNodesWithSameTopologyLabels(topologyLabels, podLauncher)
if err != nil {
return framework.NewStatus(framework.Unschedulable, ErrorReasonWhenFilterNodeWithSameTopology)
}

existingPodAntiAffinityMap := interpodScheduler.GetTPMapMatchingExistingAntiAffinity(pod, matchedNodeInfos, podLauncher)
existingPodAntiAffinityMap := utils.GetTPMapMatchingExistingAntiAffinity(pod, matchedNodeInfos, podLauncher)

podInfo := framework.NewPodInfo(pod)
incomingPodAffinityMap, incomingPodAntiAffinityMap := interpodScheduler.GetTPMapMatchingIncomingAffinityAntiAffinity(podInfo, matchedNodeInfos, podLauncher)
incomingPodAffinityMap, incomingPodAntiAffinityMap := utils.GetTPMapMatchingIncomingAffinityAntiAffinity(podInfo, matchedNodeInfos, podLauncher)

state := &interpodScheduler.PreFilterState{
TopologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap,
Expand Down Expand Up @@ -90,35 +91,35 @@ func New(_ runtime.Object, handle handle.BinderFrameworkHandle) (framework.Plugi
}, nil
}

func (pl *InterPodAffinity) getNodesWithSameTopologyLabels(topologyLabels map[string]string) ([]framework.NodeInfo, error) {
nodeLister := pl.frameworkHandle.SharedInformerFactory().Core().V1().Nodes().Lister()

func (pl *InterPodAffinity) getNodesWithSameTopologyLabels(topologyLabels map[string]string, podLauncher podutil.PodLauncher) ([]framework.NodeInfo, error) {
var matchedNodeInfos []framework.NodeInfo
nodeSet := make(map[string]*v1.Node) // Used to remove duplicates
nodeInfoSet := make(map[string]framework.NodeInfo) // Used to remove duplicates

// 针对每个 label key-value 进行筛选,并合并结果
for key, value := range topologyLabels {
selector := labels.NewSelector()

// 为每个 label key-value 创建一个筛选条件
requirement, _ := labels.NewRequirement(key, selection.Equals, []string{value})
selector = selector.Add(*requirement)

// 获取符合条件的节点
nodes, err := nodeLister.List(selector)
if err != nil {
return nil, fmt.Errorf("failed to list nodes for selector %s: %v", selector.String(), err)
}

// 将筛选结果加入到 nodeSet 中,确保不重复添加节点
for _, node := range nodes {
nodeSet[node.Name] = node
if podLauncher == podutil.Kubelet {
nodes, err := pl.frameworkHandle.SharedInformerFactory().Core().V1().Nodes().Lister().List(selector)
if err != nil {
return nil, fmt.Errorf("failed to list nodes for selector %s: %v", selector.String(), err)
}
for _, node := range nodes {
nodeInfoSet[node.Name] = pl.frameworkHandle.GetNodeInfo(node.Name)
}
} else {
nodes, err := pl.frameworkHandle.CRDSharedInformerFactory().Node().V1alpha1().NMNodes().Lister().List(selector)
if err != nil {
return nil, fmt.Errorf("failed to list nodes for selector %s: %v", selector.String(), err)
}
for _, node := range nodes {
nodeInfoSet[node.Name] = pl.frameworkHandle.GetNodeInfo(node.Name)
}
}
}

// 将去重后的节点列表转为切片
for _, node := range nodeSet {
nodeInfo := pl.frameworkHandle.GetNodeInfo(node.Name)
for _, nodeInfo := range nodeInfoSet {
matchedNodeInfos = append(matchedNodeInfos, nodeInfo)
}

Expand Down
102 changes: 69 additions & 33 deletions pkg/binder/framework/plugins/podtopologyspread/podtopologyspread.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/handle"
framework "github.com/kubewharf/godel-scheduler/pkg/framework/api"
"github.com/kubewharf/godel-scheduler/pkg/plugins/helper"
"github.com/kubewharf/godel-scheduler/pkg/plugins/podlauncher"
utils "github.com/kubewharf/godel-scheduler/pkg/plugins/podtopologyspread"
"github.com/kubewharf/godel-scheduler/pkg/scheduler/apis/config"
"github.com/kubewharf/godel-scheduler/pkg/scheduler/apis/validation"
podtopologyspreadScheduler "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/plugins/podtopologyspread"
"github.com/kubewharf/godel-scheduler/pkg/util/parallelize"
podutil "github.com/kubewharf/godel-scheduler/pkg/util/pod"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -40,9 +42,9 @@ const (
)

type TopologySpreadCondition struct {
Constraints []podtopologyspreadScheduler.TopologySpreadConstraint
TpKeyToCriticalPaths map[string]*podtopologyspreadScheduler.CriticalPaths
TpPairToMatchNum map[podtopologyspreadScheduler.TopologyPair]*int32
Constraints []utils.TopologySpreadConstraint
TpKeyToCriticalPaths map[string]*utils.CriticalPaths
TpPairToMatchNum map[utils.TopologyPair]*int32
}

type PodTopologySpreadCheck struct {
Expand All @@ -57,20 +59,25 @@ func (pl *PodTopologySpreadCheck) Name() string {
}

func (pl *PodTopologySpreadCheck) CheckConflicts(_ context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo framework.NodeInfo) *framework.Status {
topologySpreadCondition, err := pl.getTopologyCondition(pod)
podLauncher, status := podlauncher.NodeFits(cycleState, pod, nodeInfo)
if status != nil {
return status
}

topologySpreadCondition, err := pl.getTopologyCondition(pod, podLauncher)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}

if errReason := pl.isSatisfyPodTopologySpreadConstraints(pod, nodeInfo, topologySpreadCondition); errReason == "" {
if errReason := pl.isSatisfyPodTopologySpreadConstraints(pod, nodeInfo, topologySpreadCondition, podLauncher); errReason == "" {
return nil
} else {
return framework.NewStatus(framework.Unschedulable, errReason)
}
}

func New(plArgs runtime.Object, handle handle.BinderFrameworkHandle) (framework.Plugin, error) {
args, err := podtopologyspreadScheduler.GetArgs(plArgs)
args, err := utils.GetArgs(plArgs)
if err != nil {
return nil, err
}
Expand All @@ -94,8 +101,8 @@ func New(plArgs runtime.Object, handle handle.BinderFrameworkHandle) (framework.
// defaultConstraints builds the constraints for a pod using
// .DefaultConstraints and the selectors from the services, replication
// controllers, replica sets and stateful sets that match the pod.
func (pl *PodTopologySpreadCheck) defaultConstraints(p *v1.Pod, action v1.UnsatisfiableConstraintAction) ([]podtopologyspreadScheduler.TopologySpreadConstraint, error) {
constraints, err := podtopologyspreadScheduler.FilterTopologySpreadConstraints(pl.args.DefaultConstraints, action)
func (pl *PodTopologySpreadCheck) defaultConstraints(p *v1.Pod, action v1.UnsatisfiableConstraintAction) ([]utils.TopologySpreadConstraint, error) {
constraints, err := utils.FilterTopologySpreadConstraints(pl.args.DefaultConstraints, action)
if err != nil || len(constraints) == 0 {
return nil, err
}
Expand All @@ -111,16 +118,13 @@ func (pl *PodTopologySpreadCheck) defaultConstraints(p *v1.Pod, action v1.Unsati
return constraints, nil
}

func (pl *PodTopologySpreadCheck) getTopologyCondition(pod *v1.Pod) (*TopologySpreadCondition, error) {
constraints := []podtopologyspreadScheduler.TopologySpreadConstraint{}
allNodes, err := pl.frameworkHandle.SharedInformerFactory().Core().V1().Nodes().Lister().List(labels.Everything())
if err != nil {
return nil, err
}
func (pl *PodTopologySpreadCheck) getTopologyCondition(pod *v1.Pod, podLauncher podutil.PodLauncher) (*TopologySpreadCondition, error) {
var err error
constraints := []utils.TopologySpreadConstraint{}
if len(pod.Spec.TopologySpreadConstraints) > 0 {
// We have feature gating in APIServer to strip the spec
// so don't need to re-check feature gate, just check length of Constraints.
constraints, err = podtopologyspreadScheduler.FilterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.DoNotSchedule)
constraints, err = utils.FilterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.DoNotSchedule)
if err != nil {
return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %v", err)
}
Expand All @@ -134,50 +138,53 @@ func (pl *PodTopologySpreadCheck) getTopologyCondition(pod *v1.Pod) (*TopologySp
return &TopologySpreadCondition{}, nil
}

nodeInfos, err := pl.getAllNodeInfos(podLauncher)
if err != nil {
return nil, err
}
topologySpreadCondition := TopologySpreadCondition{
Constraints: constraints,
TpKeyToCriticalPaths: make(map[string]*podtopologyspreadScheduler.CriticalPaths, len(constraints)),
TpPairToMatchNum: make(map[podtopologyspreadScheduler.TopologyPair]*int32, podtopologyspreadScheduler.SizeHeuristic(len(allNodes), constraints)),
TpKeyToCriticalPaths: make(map[string]*utils.CriticalPaths, len(constraints)),
TpPairToMatchNum: make(map[utils.TopologyPair]*int32, utils.SizeHeuristic(len(nodeInfos), constraints)),
}

for _, node := range allNodes {
for _, nodeInfo := range nodeInfos {
nodeLabels := nodeInfo.GetNodeLabels(podLauncher)
// In accordance to design, if NodeAffinity or NodeSelector is defined,
// spreading is applied to nodes that pass those filters.
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(node)
if !helper.PodMatchesNodeSelectorAndAffinityTerms(pod, nodeInfo) {
continue
}
// Ensure current node's labels contains all topologyKeys in 'Constraints'.
if !podtopologyspreadScheduler.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
if !utils.NodeLabelsMatchSpreadConstraints(nodeLabels, constraints) {
continue
}
for _, c := range constraints {
pair := podtopologyspreadScheduler.TopologyPair{Key: c.TopologyKey, Value: node.Labels[c.TopologyKey]}
pair := utils.TopologyPair{Key: c.TopologyKey, Value: nodeLabels[c.TopologyKey]}
topologySpreadCondition.TpPairToMatchNum[pair] = new(int32)
}
}

processNode := func(i int) {
node := allNodes[i]
nodeInfo := pl.frameworkHandle.GetNodeInfo(node.Name)
nodeInfo := nodeInfos[i]
nodeLabels := nodeInfo.GetNodeLabels(podLauncher)

for _, constraint := range constraints {
pair := podtopologyspreadScheduler.TopologyPair{Key: constraint.TopologyKey, Value: node.Labels[constraint.TopologyKey]}
pair := utils.TopologyPair{Key: constraint.TopologyKey, Value: nodeLabels[constraint.TopologyKey]}
tpCount := topologySpreadCondition.TpPairToMatchNum[pair]
if tpCount == nil {
continue
}
count := podtopologyspreadScheduler.CountPodsMatchSelector(nodeInfo.GetPods(), constraint.Selector, pod.Namespace)
count := utils.CountPodsMatchSelector(nodeInfo.GetPods(), constraint.Selector, pod.Namespace)
atomic.AddInt32(tpCount, int32(count))
}
}
parallelize.Until(context.Background(), len(allNodes), processNode)
parallelize.Until(context.Background(), len(nodeInfos), processNode)

// calculate min match for each topology pair
for i := 0; i < len(constraints); i++ {
key := constraints[i].TopologyKey
topologySpreadCondition.TpKeyToCriticalPaths[key] = podtopologyspreadScheduler.NewCriticalPaths()
topologySpreadCondition.TpKeyToCriticalPaths[key] = utils.NewCriticalPaths()
}
for pair, num := range topologySpreadCondition.TpPairToMatchNum {
topologySpreadCondition.TpKeyToCriticalPaths[pair.Key].Update(pair.Value, *num)
Expand All @@ -186,17 +193,46 @@ func (pl *PodTopologySpreadCheck) getTopologyCondition(pod *v1.Pod) (*TopologySp
return &topologySpreadCondition, nil
}

func (pl *PodTopologySpreadCheck) getAllNodeInfos(podLauncher podutil.PodLauncher) ([]framework.NodeInfo, error) {
if podLauncher == podutil.Kubelet {
allV1Nodes, err := pl.frameworkHandle.SharedInformerFactory().Core().V1().Nodes().Lister().List(labels.Everything())
if err != nil {
return nil, err
}

nodeInfos := make([]framework.NodeInfo, 0, len(allV1Nodes))
for _, node := range allV1Nodes {
nodeInfos = append(nodeInfos, pl.frameworkHandle.GetNodeInfo(node.Name))
}

return nodeInfos, nil
} else if podLauncher == podutil.NodeManager {
allNMNodes, err := pl.frameworkHandle.CRDSharedInformerFactory().Node().V1alpha1().NMNodes().Lister().List(labels.Everything())
if err != nil {
return nil, err
}

nodeInfos := make([]framework.NodeInfo, 0, len(allNMNodes))
for _, node := range allNMNodes {
nodeInfos = append(nodeInfos, pl.frameworkHandle.GetNodeInfo(node.Name))
}

return nodeInfos, nil
}
return nil, fmt.Errorf("unsupported pod launcher: %v", podLauncher)
}

func (pl *PodTopologySpreadCheck) isSatisfyPodTopologySpreadConstraints(pod *v1.Pod, nodeInfo framework.NodeInfo,
topologySpreadCondition *TopologySpreadCondition) string {
topologySpreadCondition *TopologySpreadCondition, podLauncher podutil.PodLauncher) string {
if topologySpreadCondition == nil || len(topologySpreadCondition.Constraints) == 0 {
return ""
}

node := nodeInfo.GetNode()
nodeLabels := nodeInfo.GetNodeLabels(podLauncher)
podLabelSet := labels.Set(pod.Labels)
for _, c := range topologySpreadCondition.Constraints {
tpKey := c.TopologyKey
tpVal, ok := node.Labels[c.TopologyKey]
tpVal, ok := nodeLabels[c.TopologyKey]
if !ok {
return ErrReasonPodTopologySpreadNodeLabelNotMatch
}
Expand All @@ -206,7 +242,7 @@ func (pl *PodTopologySpreadCheck) isSatisfyPodTopologySpreadConstraints(pod *v1.
selfMatchNum = 1
}

pair := podtopologyspreadScheduler.TopologyPair{Key: tpKey, Value: tpVal}
pair := utils.TopologyPair{Key: tpKey, Value: tpVal}
paths, ok := topologySpreadCondition.TpKeyToCriticalPaths[tpKey]
if !ok {
// error which should not happen
Expand Down
Loading

0 comments on commit d7fc2ed

Please sign in to comment.