Skip to content

Commit

Permalink
CA: extend SchedulerPluginRunner with RunReserveOnNode
Browse files Browse the repository at this point in the history
RunReserveOnNode runs the Reserve phase of schedulerframework,
which is necessary to obtain ResourceClaim allocations computed
by the DRA scheduler plugin.

RunReserveOnNode isn't used anywhere yet, so this should be a no-op.
  • Loading branch information
towca committed Dec 4, 2024
1 parent 7efd397 commit de210cb
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapsh
// function - until a Node where the Filters pass is found. Filters are only run for matching Nodes. If no matching Node with passing Filters is found, an error is returned.
//
// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner.
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
nodeInfosList, err := p.snapshot.ListNodeInfos()
if err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err))
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err))
}

p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
Expand All @@ -61,7 +61,7 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
if !preFilterStatus.IsSuccess() {
// If any of the plugin PreFilter methods isn't successful, the corresponding Filter method can't be run, so the whole scheduling cycle is aborted.
// Match that behavior here.
return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
}

for i := range nodeInfosList {
Expand Down Expand Up @@ -92,18 +92,18 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
if filterStatus.IsSuccess() {
// Filter passed for all plugins, so this pod can be scheduled on this Node.
p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList)
return nodeInfo.Node().Name, nil
return nodeInfo.Node(), state, nil
}
// Filter didn't pass for some plugin, so this Node won't work - move on to the next one.
}
return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
}

// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node.
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
nodeInfo, err := p.snapshot.GetNodeInfo(nodeName)
if err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
}

p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
Expand All @@ -113,10 +113,10 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string
// Run the PreFilter phase of the framework for the Pod and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info.
preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod)
if !preFilterStatus.IsSuccess() {
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
}
if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) {
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "")
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "")
}

// Run the Filter phase of the framework for the Pod and the Node and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info.
Expand All @@ -128,10 +128,22 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string
if !filterStatus.IsRejected() {
unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String())
}
return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
}

// PreFilter and Filter phases checked, this Pod can be scheduled on this Node.
return nodeInfo.Node(), state, nil
}

// RunReserveOnNode runs the scheduler framework Reserve phase to update the scheduler plugins state to reflect the Pod being scheduled on the Node.
func (p *SchedulerPluginRunner) RunReserveOnNode(pod *apiv1.Pod, nodeName string, postFilterState *schedulerframework.CycleState) error {
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
defer p.fwHandle.DelegatingLister.ResetDelegate()

status := p.fwHandle.Framework.RunReservePluginsReserve(context.Background(), postFilterState, pod, nodeName)
if !status.IsSuccess() {
return fmt.Errorf("couldn't reserve node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, status.Message())
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ func TestRunFiltersOnNode(t *testing.T) {
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...))
assert.NoError(t, err)

predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name)
node, state, predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name)
if tt.expectError {
assert.Nil(t, node)
assert.Nil(t, state)
assert.NotNil(t, predicateError)
assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type())
assert.Equal(t, "NodeResourcesFit", predicateError.FailingPredicateName())
Expand All @@ -154,6 +156,8 @@ func TestRunFiltersOnNode(t *testing.T) {
assert.Contains(t, predicateError.Error(), "Insufficient cpu")
} else {
assert.Nil(t, predicateError)
assert.NotNil(t, state)
assert.Equal(t, tt.node, node)
}
})
}
Expand Down Expand Up @@ -243,12 +247,15 @@ func TestRunFilterUntilPassingNode(t *testing.T) {
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000))
assert.NoError(t, err)

nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true })
node, state, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true })
if tc.expectError {
assert.Nil(t, node)
assert.Nil(t, state)
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Contains(t, tc.expectedNodes, nodeName)
assert.NotNil(t, state)
assert.Contains(t, tc.expectedNodes, node.Name)
}
})
}
Expand Down Expand Up @@ -278,7 +285,7 @@ func TestDebugInfo(t *testing.T) {
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
assert.NoError(t, err)

predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1")
_, _, predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1")
assert.NotNil(t, predicateErr)
assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
Expand Down Expand Up @@ -308,7 +315,7 @@ func TestDebugInfo(t *testing.T) {
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
assert.NoError(t, err)

predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1")
_, _, predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1")
assert.Nil(t, predicateErr)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error {

// SchedulePod adds pod to the snapshot and schedules it to given node.
func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil {
if _, _, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil {
return schedErr
}
if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil {
Expand All @@ -85,14 +85,14 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster

// SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function.
func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
nodeName, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching)
node, _, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching)
if schedErr != nil {
return "", schedErr
}
if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil {
if err := s.ClusterSnapshotStore.ForceAddPod(pod, node.Name); err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
return nodeName, nil
return node.Name, nil
}

// UnschedulePod removes the given Pod from the given Node inside the snapshot.
Expand All @@ -102,5 +102,6 @@ func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, node

// CheckPredicates checks whether scheduler predicates pass for the given pod on the given node.
func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
return s.pluginRunner.RunFiltersOnNode(pod, nodeName)
_, _, err := s.pluginRunner.RunFiltersOnNode(pod, nodeName)
return err
}

0 comments on commit de210cb

Please sign in to comment.