Skip to content

Commit

Permalink
filter dedicated consumer in cnr
Browse files Browse the repository at this point in the history
  • Loading branch information
WangZzzhe committed Dec 27, 2023
1 parent a07642e commit 3690c5e
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 51 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ func (cache *extendedCache) UnreserveNodeResource(nodeName string, pod *v1.Pod)
}

// GetNodeResourceTopology assumedPodResource will be added to nodeResourceTopology
func (cache *extendedCache) GetNodeResourceTopology(nodeName string) *ResourceTopology {
func (cache *extendedCache) GetNodeResourceTopology(nodeName string, filterFn podFilter) *ResourceTopology {
cache.mu.RLock()
defer cache.mu.RUnlock()

nodeInfo, ok := cache.nodes[nodeName]
if !ok {
return nil
}
return nodeInfo.GetResourceTopologyCopy()
return nodeInfo.GetResourceTopologyCopy(filterFn)
}
4 changes: 2 additions & 2 deletions pkg/scheduler/cache/nodeinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ func (n *NodeInfo) DeleteAssumedPod(pod *v1.Pod) {
n.AssumedPodResources.DeletePod(pod)
}

func (n *NodeInfo) GetResourceTopologyCopy() *ResourceTopology {
func (n *NodeInfo) GetResourceTopologyCopy(filterFn podFilter) *ResourceTopology {
n.Mutex.RLock()
defer n.Mutex.RUnlock()

if n.ResourceTopology == nil {
return nil
}

return n.ResourceTopology.WithPodReousrce(n.AssumedPodResources)
return n.ResourceTopology.WithPodReousrce(n.AssumedPodResources, filterFn)
}
29 changes: 21 additions & 8 deletions pkg/scheduler/cache/resourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type ResourceTopology struct {
TopologyPolicy v1alpha1.TopologyPolicy
}

type podFilter func(consumer string) bool

func (rt *ResourceTopology) Update(nrt *v1alpha1.CustomNodeResource) {
cp := nrt.DeepCopy()

Expand All @@ -39,28 +41,39 @@ func (rt *ResourceTopology) Update(nrt *v1alpha1.CustomNodeResource) {

// WithPodReousrce add assumedPodResource to ResourceTopology,
// performing pessimistic overallocation across all the NUMA zones.
func (rt *ResourceTopology) WithPodReousrce(podResource native.PodResource) *ResourceTopology {
func (rt *ResourceTopology) WithPodReousrce(podResource native.PodResource, filter podFilter) *ResourceTopology {
cp := rt.DeepCopy()

if len(podResource) == 0 {
return cp
}
for _, topologyZone := range cp.TopologyZone {
if topologyZone.Type != v1alpha1.TopologyTypeSocket {
for i := range cp.TopologyZone {
if cp.TopologyZone[i].Type != v1alpha1.TopologyTypeSocket {
continue
}
for _, child := range topologyZone.Children {
for j, child := range cp.TopologyZone[i].Children {
if child.Type != v1alpha1.TopologyTypeNuma {
continue
}
allocation := make([]*v1alpha1.Allocation, 0)

if filter != nil {
for _, alloc := range child.Allocations {
if filter(alloc.Consumer) {
allocation = append(allocation, alloc.DeepCopy())
}
}
} else {
allocation = append(allocation, child.Allocations...)
}

for key, podReq := range podResource {
copyReq := podReq.DeepCopy()
fakeAllocation := v1alpha1.Allocation{
Consumer: fmt.Sprintf("fake-consumer/%s/uid", key),
Requests: &copyReq,
}
child.Allocations = append(child.Allocations, &fakeAllocation)
allocation = append(allocation, &fakeAllocation)
}

cp.TopologyZone[i].Children[j].Allocations = allocation
}
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/scheduler/plugins/noderesourcetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,16 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle
return nil
}

nodeName := nodeInfo.Node().Name
nodeResourceTopologycache := cache.GetCache().GetNodeResourceTopology(nodeName)
var (
nodeName = nodeInfo.Node().Name
nodeResourceTopologycache *cache.ResourceTopology
)
if consts.ResourcePluginPolicyNameDynamic == tm.resourcePolicy {
// only dedicated pods will participate in the calculation
nodeResourceTopologycache = cache.GetCache().GetNodeResourceTopology(nodeName, tm.dedicatedPodsFilter(nodeInfo))
} else {
nodeResourceTopologycache = cache.GetCache().GetNodeResourceTopology(nodeName, nil)
}
if nodeResourceTopologycache == nil {
return nil
}
Expand Down
202 changes: 182 additions & 20 deletions pkg/scheduler/plugins/noderesourcetopology/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"

"github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
"github.com/kubewharf/katalyst-api/pkg/consts"
Expand All @@ -34,7 +35,63 @@ import (
"github.com/kubewharf/katalyst-core/pkg/scheduler/util"
)

func makeTestFilterNodes(policy v1alpha1.TopologyPolicy) ([]*v1alpha1.CustomNodeResource, []string) {
var _ framework.SharedLister = &testSharedLister{}

type testSharedLister struct {
nodes []*v1.Node
nodeInfos []*framework.NodeInfo
nodeInfoMap map[string]*framework.NodeInfo
}

func (f *testSharedLister) NodeInfos() framework.NodeInfoLister {
return f
}

func (f *testSharedLister) List() ([]*framework.NodeInfo, error) {
return f.nodeInfos, nil
}

func (f *testSharedLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) {
return nil, nil
}

func (f *testSharedLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
return nil, nil
}

func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) {
return f.nodeInfoMap[nodeName], nil
}

func newTestSharedLister(pods []*v1.Pod, nodes []*v1.Node) *testSharedLister {
nodeInfoMap := make(map[string]*framework.NodeInfo)
nodeInfos := make([]*framework.NodeInfo, 0)
for _, pod := range pods {
nodeName := pod.Spec.NodeName
if _, ok := nodeInfoMap[nodeName]; !ok {
nodeInfoMap[nodeName] = framework.NewNodeInfo()
}
nodeInfoMap[nodeName].AddPod(pod)
}
for _, node := range nodes {
if _, ok := nodeInfoMap[node.Name]; !ok {
nodeInfoMap[node.Name] = framework.NewNodeInfo()
}
nodeInfoMap[node.Name].SetNode(node)
}

for _, v := range nodeInfoMap {
nodeInfos = append(nodeInfos, v)
}

return &testSharedLister{
nodes: nodes,
nodeInfos: nodeInfos,
nodeInfoMap: nodeInfoMap,
}
}

func makeTestFilterNodes(policy v1alpha1.TopologyPolicy) ([]*v1alpha1.CustomNodeResource, []string, []*v1.Pod) {
cnrs := []*v1alpha1.CustomNodeResource{
{
ObjectMeta: metav1.ObjectMeta{Name: "node-2numa-8c16g"},
Expand Down Expand Up @@ -373,7 +430,71 @@ func makeTestFilterNodes(policy v1alpha1.TopologyPolicy) ([]*v1alpha1.CustomNode
},
},
}
return cnrs, []string{"node-2numa-8c16g", "node-2numa-4c8g", "node-2numa-8c16g-with-allocation", "node-4numa-8c16g-cross-socket", "node-4numa-8c16g-full-socket"}

pods := []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testNamespace",
Name: "testPod1",
Annotations: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores,
},
},
Spec: v1.PodSpec{
NodeName: "node-2numa-8c16g-with-allocation",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testNamespace",
Name: "testPod2",
Annotations: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores,
},
},
Spec: v1.PodSpec{
NodeName: "node-4numa-8c16g-cross-socket",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testNamespace",
Name: "testPod3",
Annotations: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores,
},
},
Spec: v1.PodSpec{
NodeName: "node-4numa-8c16g-cross-socket",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testNamespace",
Name: "testPod4",
Annotations: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores,
},
},
Spec: v1.PodSpec{
NodeName: "node-4numa-8c16g-full-socket",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testNamespace",
Name: "testPod5",
Annotations: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores,
},
},
Spec: v1.PodSpec{
NodeName: "node-4numa-8c16g-full-socket",
},
},
}

return cnrs, []string{"node-2numa-8c16g", "node-2numa-4c8g", "node-2numa-8c16g-with-allocation", "node-4numa-8c16g-cross-socket", "node-4numa-8c16g-full-socket"}, pods
}

func TestFilterNative(t *testing.T) {
Expand Down Expand Up @@ -500,23 +621,37 @@ func TestFilterNative(t *testing.T) {
c := cache.GetCache()
util.SetQoSConfig(generic.NewQoSConfiguration())
for _, tc := range nativeTestCase {
cnrs, nodes := makeTestFilterNodes(tc.policy)
cnrs, nodeNames, pods := makeTestFilterNodes(tc.policy)
for _, cnr := range cnrs {
c.AddOrUpdateCNR(cnr)
}

tm, err := MakeTestTm(MakeTestArgs(config.MostAllocated, tc.alignedResource, "native"))
assert.NoError(t, err)

ret := make(map[string]*framework.Status)
for _, node := range nodes {
nodeInfos := make([]*framework.NodeInfo, 0)
nodes := make([]*v1.Node, 0)
for _, node := range nodeNames {
n := &v1.Node{}
n.SetName(node)
nodes = append(nodes, n)
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(n)
for _, pod := range pods {
nodeInfo.AddPod(pod)
}
nodeInfos = append(nodeInfos, nodeInfo)
}
f, err := runtime.NewFramework(nil, nil,
runtime.WithSnapshotSharedLister(newTestSharedLister(pods, nodes)))
assert.NoError(t, err)

tm, err := MakeTestTm(MakeTestArgs(config.MostAllocated, tc.alignedResource, "native"), f)
assert.NoError(t, err)

for _, nodeInfo := range nodeInfos {
status := tm.(*TopologyMatch).Filter(context.TODO(), nil, tc.pod, nodeInfo)
ret[node] = status
ret[nodeInfo.Node().Name] = status
}

// check result
for wantN, wantS := range tc.wantRes {
if wantS == nil {
Expand Down Expand Up @@ -678,23 +813,37 @@ func TestFilterDedicatedNumaBinding(t *testing.T) {
c := cache.GetCache()
util.SetQoSConfig(generic.NewQoSConfiguration())
for _, tc := range numaBindingCase {
cnrs, nodes := makeTestFilterNodes(tc.policy)
cnrs, nodeNames, pods := makeTestFilterNodes(tc.policy)
for _, cnr := range cnrs {
c.AddOrUpdateCNR(cnr)
}

tm, err := MakeTestTm(MakeTestArgs(config.MostAllocated, tc.alignedResource, "dynamic"))
assert.NoError(t, err)

ret := make(map[string]*framework.Status)
for _, node := range nodes {
nodeInfos := make([]*framework.NodeInfo, 0)
nodes := make([]*v1.Node, 0)
for _, node := range nodeNames {
n := &v1.Node{}
n.SetName(node)
nodes = append(nodes, n)
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(n)
for _, pod := range pods {
nodeInfo.AddPod(pod)
}
nodeInfos = append(nodeInfos, nodeInfo)
}
f, err := runtime.NewFramework(nil, nil,
runtime.WithSnapshotSharedLister(newTestSharedLister(pods, nodes)))
assert.NoError(t, err)

tm, err := MakeTestTm(MakeTestArgs(config.MostAllocated, tc.alignedResource, "dynamic"), f)
assert.NoError(t, err)

for _, nodeInfo := range nodeInfos {
status := tm.(*TopologyMatch).Filter(context.TODO(), nil, tc.pod, nodeInfo)
ret[node] = status
ret[nodeInfo.Node().Name] = status
}

// check result
for wantN, wantS := range tc.wantRes {
if wantS == nil {
Expand Down Expand Up @@ -809,22 +958,35 @@ func TestFilterDedicatedExclusive(t *testing.T) {
c := cache.GetCache()
util.SetQoSConfig(generic.NewQoSConfiguration())
for _, tc := range numaExclusiveCase {
cnrs, nodes := makeTestFilterNodes(tc.policy)
cnrs, nodeNames, pods := makeTestFilterNodes(tc.policy)
for _, cnr := range cnrs {
c.AddOrUpdateCNR(cnr)
}

tm, err := MakeTestTm(MakeTestArgs(config.MostAllocated, tc.alignedResource, "dynamic"))
assert.NoError(t, err)

ret := make(map[string]*framework.Status)
for _, node := range nodes {
nodeInfos := make([]*framework.NodeInfo, 0)
nodes := make([]*v1.Node, 0)
for _, node := range nodeNames {
n := &v1.Node{}
n.SetName(node)
nodes = append(nodes, n)
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(n)
for _, pod := range pods {
nodeInfo.AddPod(pod)
}
nodeInfos = append(nodeInfos, nodeInfo)
}
f, err := runtime.NewFramework(nil, nil,
runtime.WithSnapshotSharedLister(newTestSharedLister(pods, nodes)))
assert.NoError(t, err)

tm, err := MakeTestTm(MakeTestArgs(config.MostAllocated, tc.alignedResource, "dynamic"), f)
assert.NoError(t, err)

for _, nodeInfo := range nodeInfos {
status := tm.(*TopologyMatch).Filter(context.TODO(), nil, tc.pod, nodeInfo)
ret[node] = status
ret[nodeInfo.Node().Name] = status
}
// check result
for wantN, wantS := range tc.wantRes {
Expand Down
Loading

0 comments on commit 3690c5e

Please sign in to comment.