Skip to content

Commit

Permalink
TMP
Browse files Browse the repository at this point in the history
  • Loading branch information
towca committed Nov 20, 2024
1 parent b93c596 commit b5df1d7
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 26 deletions.
19 changes: 12 additions & 7 deletions cluster-autoscaler/dynamicresources/resource_claim_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dynamicresources

import (
"fmt"
"k8s.io/component-helpers/scheduling/corev1"

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
Expand Down Expand Up @@ -52,14 +53,18 @@ func ClaimInUse(claim *resourceapi.ResourceClaim) bool {
return len(claim.Status.ReservedFor) > 0
}

// ClaimReservedForPod returns whether the provided claim is currently reserved for the provided Pod.
func ClaimReservedForPod(claim *resourceapi.ResourceClaim, pod *apiv1.Pod) bool {
for _, consumerRef := range claim.Status.ReservedFor {
if claimConsumerReferenceMatchesPod(pod, consumerRef) {
return true
}
// ClaimAvailableOnNode returns whether the provided claim is allocated and available on the provided Node.
func ClaimAvailableOnNode(claim *resourceapi.ResourceClaim, node *apiv1.Node) (bool, error) {
if !ClaimAllocated(claim) {
// Not allocated so not available anywhere.
return false, nil
}
selector := claim.Status.Allocation.NodeSelector
if selector == nil {
// nil means available everywhere.
return true, nil
}
return false
return corev1.MatchNodeSelectorTerms(node, claim.Status.Allocation.NodeSelector)
}

// DeallocateClaimInPlace clears the allocation of the provided claim.
Expand Down
37 changes: 37 additions & 0 deletions cluster-autoscaler/dynamicresources/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dynamicresources

import (
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
Expand Down Expand Up @@ -57,6 +58,42 @@ func (s Snapshot) DeviceClasses() schedulerframework.DeviceClassLister {
return snapshotClassLister(s)
}

// WrapSchedulerNodeInfo wraps the provided *schedulerframework.NodeInfo into an internal *framework.NodeInfo, adding
// dra information. Node-local ResourceSlices are added to the NodeInfo, and all ResourceClaims referenced by each Pod
// are added to each PodInfo. Returns an error if any of the Pods is missing a ResourceClaim.
func (s Snapshot) WrapSchedulerNodeInfo(schedNodeInfo *schedulerframework.NodeInfo) (*framework.NodeInfo, error) {
var pods []*framework.PodInfo
for _, podInfo := range schedNodeInfo.Pods {
podClaims, err := s.PodClaims(podInfo.Pod)
if err != nil {
return nil, err
}
pods = append(pods, &framework.PodInfo{
Pod: podInfo.Pod,
NeededResourceClaims: podClaims,
})
}
nodeSlices, _ := s.NodeResourceSlices(schedNodeInfo.Node())
return &framework.NodeInfo{
NodeInfo: schedNodeInfo,
LocalResourceSlices: nodeSlices,
Pods: pods,
}, nil
}

// WrapSchedulerNodeInfos calls WrapSchedulerNodeInfo() on a list of *schedulerframework.NodeInfos.
func (s Snapshot) WrapSchedulerNodeInfos(schedNodeInfos []*schedulerframework.NodeInfo) ([]*framework.NodeInfo, error) {
var result []*framework.NodeInfo
for _, schedNodeInfo := range schedNodeInfos {
nodeInfo, err := s.WrapSchedulerNodeInfo(schedNodeInfo)
if err != nil {
return nil, err
}
result = append(result, nodeInfo)
}
return result, nil
}

// Clone returns a copy of this Snapshot that can be independently modified without affecting this Snapshot.
// The only mutable objects in the Snapshot are ResourceClaims, so they are deep-copied. The rest is only a
// shallow copy.
Expand Down
15 changes: 12 additions & 3 deletions cluster-autoscaler/simulator/clustersnapshot/base/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package base

import (
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/dynamicresources"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
Expand All @@ -35,6 +36,7 @@ type BasicSnapshotBase struct {
type internalBasicSnapshotData struct {
nodeInfoMap map[string]*schedulerframework.NodeInfo
pvcNamespacePodMap map[string]map[string]bool
draSnapshot dynamicresources.Snapshot
}

func (data *internalBasicSnapshotData) listNodeInfos() []*schedulerframework.NodeInfo {
Expand Down Expand Up @@ -123,6 +125,7 @@ func newInternalBasicSnapshotData() *internalBasicSnapshotData {
return &internalBasicSnapshotData{
nodeInfoMap: make(map[string]*schedulerframework.NodeInfo),
pvcNamespacePodMap: make(map[string]map[string]bool),
draSnapshot: dynamicresources.Snapshot{},
}
}

Expand All @@ -141,6 +144,7 @@ func (data *internalBasicSnapshotData) clone() *internalBasicSnapshotData {
return &internalBasicSnapshotData{
nodeInfoMap: clonedNodeInfoMap,
pvcNamespacePodMap: clonedPvcNamespaceNodeMap,
draSnapshot: data.draSnapshot.Clone(),
}
}

Expand Down Expand Up @@ -205,19 +209,24 @@ func (snapshot *BasicSnapshotBase) getInternalData() *internalBasicSnapshotData
return snapshot.data[len(snapshot.data)-1]
}

// DraSnapshot returns the DRA snapshot.
func (snapshot *BasicSnapshotBase) DraSnapshot() clustersnapshot.DraSnapshotMutator {
return snapshot.getInternalData().draSnapshot
}

// GetNodeInfo gets a NodeInfo.
func (snapshot *BasicSnapshotBase) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
schedNodeInfo, err := snapshot.getInternalData().getNodeInfo(nodeName)
if err != nil {
return nil, err
}
return framework.WrapSchedulerNodeInfo(schedNodeInfo), nil
return snapshot.getInternalData().draSnapshot.WrapSchedulerNodeInfo(schedNodeInfo)
}

// ListNodeInfos lists NodeInfos.
func (snapshot *BasicSnapshotBase) ListNodeInfos() ([]*framework.NodeInfo, error) {
schedNodeInfos := snapshot.getInternalData().listNodeInfos()
return framework.WrapSchedulerNodeInfos(schedNodeInfos), nil
return snapshot.getInternalData().draSnapshot.WrapSchedulerNodeInfos(schedNodeInfos)
}

// AddNodeInfo adds a NodeInfo.
Expand Down Expand Up @@ -321,7 +330,7 @@ func (snapshot *BasicSnapshotBase) StorageInfos() schedulerframework.StorageInfo

// SharedDRAManager exposes snapshot as SharedDRAManager.
func (snapshot *BasicSnapshotBase) SharedDRAManager() schedulerframework.SharedDRAManager {
return nil
return snapshot.getInternalData().draSnapshot
}

// List returns the list of nodes in the snapshot.
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/simulator/clustersnapshot/base/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ func NewDeltaSnapshotBase() *DeltaSnapshotBase {
return snapshot
}

// DraSnapshot returns the DRA snapshot.
func (snapshot *DeltaSnapshotBase) DraSnapshot() clustersnapshot.DraSnapshotMutator {
return nil
}

// GetNodeInfo gets a NodeInfo.
func (snapshot *DeltaSnapshotBase) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
schedNodeInfo, err := snapshot.getNodeInfo(nodeName)
Expand Down
16 changes: 16 additions & 0 deletions cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package clustersnapshot
import (
"errors"
apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -77,6 +78,9 @@ type SnapshotBase interface {
// ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo.
ListNodeInfos() ([]*framework.NodeInfo, error)

// DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot.
DraSnapshot() DraSnapshotMutator

// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert().
// Use WithForkedSnapshot() helper function instead if possible.
Fork()
Expand All @@ -86,6 +90,18 @@ type SnapshotBase interface {
Commit() error
}

// DraSnapshotMutator allows accessing and modifying DRA objects in a snapshot.
type DraSnapshotMutator interface {
PodClaims(pod *apiv1.Pod) ([]*resourceapi.ResourceClaim, error)
RemovePodClaims(pod *apiv1.Pod)
ReservePodClaims(pod *apiv1.Pod) error
UnreservePodClaims(pod *apiv1.Pod) error

NodeResourceSlices(node *apiv1.Node) ([]*resourceapi.ResourceSlice, bool)
AddNodeResourceSlices(nodeName string, slices []*resourceapi.ResourceSlice) error
RemoveNodeResourceSlices(nodeName string)
}

// ErrNodeNotFound means that a node wasn't found in the snapshot.
var ErrNodeNotFound = errors.New("node not found")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotBase clustersn
// function - until a Node where the filters pass is found. Filters are only run for matching Nodes. If no matching node with passing filters is found, an error is returned.
//
// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner.
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
nodeInfosList, err := p.snapshotBase.ListNodeInfos()
if err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided")
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided")
}

p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase)
Expand All @@ -56,7 +56,7 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
state := schedulerframework.NewCycleState()
preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod)
if !preFilterStatus.IsSuccess() {
return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
}

for i := range nodeInfosList {
Expand All @@ -77,17 +77,17 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler())
if filterStatus.IsSuccess() {
p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList)
return nodeInfo.Node().Name, nil
return nodeInfo.Node(), state, nil
}
}
return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
}

// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node.
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
nodeInfo, err := p.snapshotBase.GetNodeInfo(nodeName)
if err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
}

p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase)
Expand All @@ -96,7 +96,7 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string
state := schedulerframework.NewCycleState()
_, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod)
if !preFilterStatus.IsSuccess() {
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
}

filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler())
Expand All @@ -108,9 +108,21 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string
if !filterStatus.IsRejected() {
unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String())
}
return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
}

return nodeInfo.Node(), state, nil
}

// RunReserveOnNode runs the scheduler framework Reserve phase to update the scheduler plugins state to reflect the Pod being scheduled on the Node.
func (p *SchedulerPluginRunner) RunReserveOnNode(pod *apiv1.Pod, nodeName string, postFilterState *schedulerframework.CycleState) error {
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase)
defer p.fwHandle.DelegatingLister.ResetDelegate()

status := p.fwHandle.Framework.RunReservePluginsReserve(context.Background(), postFilterState, pod, nodeName)
if !status.IsSuccess() {
return fmt.Errorf("couldn't reserve node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, status.Message())
}
return nil
}

Expand Down
Loading

0 comments on commit b5df1d7

Please sign in to comment.