From de210cb0a5342af178b13050fc90c7c0b4b57041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Thu, 21 Nov 2024 18:05:09 +0100 Subject: [PATCH] CA: extend SchedulerPluginRunner with RunReserveOnNode RunReserveOnNode runs the Reserve phase of schedulerframework, which is necessary to obtain ResourceClaim allocations computed by the DRA scheduler plugin. RunReserveOnNode isn't used anywhere yet, so this should be a no-op. --- .../predicate/plugin_runner.go | 32 +++++++++++++------ .../predicate/plugin_runner_test.go | 17 +++++++--- .../predicate/predicate_snapshot.go | 11 ++++--- 3 files changed, 40 insertions(+), 20 deletions(-) diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go index 5a0953009ad0..e38e63bcbb21 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -44,10 +44,10 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapsh // function - until a Node where the Filters pass is found. Filters are only run for matching Nodes. If no matching Node with passing Filters is found, an error is returned. // // The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. -func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { +func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) { nodeInfosList, err := p.snapshot.ListNodeInfos() if err != nil { - return "", clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err)) + return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err)) } p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) @@ -61,7 +61,7 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM if !preFilterStatus.IsSuccess() { // If any of the plugin PreFilter methods isn't successful, the corresponding Filter method can't be run, so the whole scheduling cycle is aborted. // Match that behavior here. - return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") } for i := range nodeInfosList { @@ -92,18 +92,18 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM if filterStatus.IsSuccess() { // Filter passed for all plugins, so this pod can be scheduled on this Node. p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList) - return nodeInfo.Node().Name, nil + return nodeInfo.Node(), state, nil } // Filter didn't pass for some plugin, so this Node won't work - move on to the next one. } - return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) + return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) } // RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. -func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { +func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) { nodeInfo, err := p.snapshot.GetNodeInfo(nodeName) if err != nil { - return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) + return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) } p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) @@ -113,10 +113,10 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string // Run the PreFilter phase of the framework for the Pod and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) if !preFilterStatus.IsSuccess() { - return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") } if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) { - return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "") + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "") } // Run the Filter phase of the framework for the Pod and the Node and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. @@ -128,10 +128,22 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string if !filterStatus.IsRejected() { unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String()) } - return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) } // PreFilter and Filter phases checked, this Pod can be scheduled on this Node. + return nodeInfo.Node(), state, nil +} + +// RunReserveOnNode runs the scheduler framework Reserve phase to update the scheduler plugins state to reflect the Pod being scheduled on the Node. +func (p *SchedulerPluginRunner) RunReserveOnNode(pod *apiv1.Pod, nodeName string, postFilterState *schedulerframework.CycleState) error { + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) + defer p.fwHandle.DelegatingLister.ResetDelegate() + + status := p.fwHandle.Framework.RunReservePluginsReserve(context.Background(), postFilterState, pod, nodeName) + if !status.IsSuccess() { + return fmt.Errorf("couldn't reserve node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, status.Message()) + } return nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index 5c6a7c9d41db..f7015d005845 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -144,8 +144,10 @@ func TestRunFiltersOnNode(t *testing.T) { err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) - predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) + node, state, predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) if tt.expectError { + assert.Nil(t, node) + assert.Nil(t, state) assert.NotNil(t, predicateError) assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type()) assert.Equal(t, "NodeResourcesFit", predicateError.FailingPredicateName()) @@ -154,6 +156,8 @@ func TestRunFiltersOnNode(t *testing.T) { assert.Contains(t, predicateError.Error(), "Insufficient cpu") } else { assert.Nil(t, predicateError) + assert.NotNil(t, state) + assert.Equal(t, tt.node, node) } }) } @@ -243,12 +247,15 @@ func TestRunFilterUntilPassingNode(t *testing.T) { err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) - nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) + node, state, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) if tc.expectError { + assert.Nil(t, node) + assert.Nil(t, state) assert.Error(t, err) } else { assert.NoError(t, err) - assert.Contains(t, tc.expectedNodes, nodeName) + assert.NotNil(t, state) + assert.Contains(t, tc.expectedNodes, node.Name) } }) } @@ -278,7 +285,7 @@ func TestDebugInfo(t *testing.T) { err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) - predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1") + _, _, predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1") assert.NotNil(t, predicateErr) assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") @@ -308,7 +315,7 @@ func TestDebugInfo(t *testing.T) { err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) - predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1") + _, _, predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1") assert.Nil(t, predicateErr) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index dd57b47ffa39..d59fc9bfa135 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -74,7 +74,7 @@ func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error { // SchedulePod adds pod to the snapshot and schedules it to given node. func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { + if _, _, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { return schedErr } if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil { @@ -85,14 +85,14 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster // SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function. func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - nodeName, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) + node, _, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) if schedErr != nil { return "", schedErr } - if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil { + if err := s.ClusterSnapshotStore.ForceAddPod(pod, node.Name); err != nil { return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error()) } - return nodeName, nil + return node.Name, nil } // UnschedulePod removes the given Pod from the given Node inside the snapshot. @@ -102,5 +102,6 @@ func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, node // CheckPredicates checks whether scheduler predicates pass for the given pod on the given node. func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - return s.pluginRunner.RunFiltersOnNode(pod, nodeName) + _, _, err := s.pluginRunner.RunFiltersOnNode(pod, nodeName) + return err }