From 705cb01871caa6eb04502eeab17a1800de474c41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Thu, 21 Nov 2024 18:05:09 +0100 Subject: [PATCH] CA: extend SchedulerPluginRunner with RunReserveOnNode RunReserveOnNode runs the Reserve phase of schedulerframework, which is necessary to obtain ResourceClaim allocations computed by the DRA scheduler plugin. RunReserveOnNode isn't used anywhere yet, so this should be a no-op. --- .../predicate/plugin_runner.go | 30 +++++++++++++------ .../predicate/plugin_runner_test.go | 17 +++++++---- .../predicate/predicate_snapshot.go | 11 +++---- 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go index 29d291221cee..b7b30e989959 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -44,10 +44,10 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapsh // function - until a Node where the filters pass is found. Filters are only run for matching Nodes. If no matching node with passing filters is found, an error is returned. // // The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. -func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { +func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) { nodeInfosList, err := p.snapshot.ListNodeInfos() if err != nil { - return "", clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided") + return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided") } p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) @@ -56,7 +56,7 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM state := schedulerframework.NewCycleState() preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) if !preFilterStatus.IsSuccess() { - return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") } for i := range nodeInfosList { @@ -77,17 +77,17 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler()) if filterStatus.IsSuccess() { p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList) - return nodeInfo.Node().Name, nil + return nodeInfo.Node(), state, nil } } - return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) + return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) } // RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. -func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { +func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) { nodeInfo, err := p.snapshot.GetNodeInfo(nodeName) if err != nil { - return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) + return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) } p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) @@ -96,7 +96,7 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string state := schedulerframework.NewCycleState() _, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) if !preFilterStatus.IsSuccess() { - return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") } filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler()) @@ -108,9 +108,21 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string if !filterStatus.IsRejected() { unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String()) } - return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) } + return nodeInfo.Node(), state, nil +} + +// RunReserveOnNode runs the scheduler framework Reserve phase to update the scheduler plugins state to reflect the Pod being scheduled on the Node. +func (p *SchedulerPluginRunner) RunReserveOnNode(pod *apiv1.Pod, nodeName string, postFilterState *schedulerframework.CycleState) error { + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) + defer p.fwHandle.DelegatingLister.ResetDelegate() + + status := p.fwHandle.Framework.RunReservePluginsReserve(context.Background(), postFilterState, pod, nodeName) + if !status.IsSuccess() { + return fmt.Errorf("couldn't reserve node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, status.Message()) + } return nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index 1c84862c412c..c1d345c9e54b 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -144,8 +144,10 @@ func TestRunFiltersOnNode(t *testing.T) { err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) - predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) + node, state, predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) if tt.expectError { + assert.Nil(t, node) + assert.Nil(t, state) assert.NotNil(t, predicateError) assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type()) assert.Equal(t, "NodeResourcesFit", predicateError.FailingPredicateName()) @@ -154,6 +156,8 @@ func TestRunFiltersOnNode(t *testing.T) { assert.Contains(t, predicateError.Error(), "Insufficient cpu") } else { assert.Nil(t, predicateError) + assert.NotNil(t, state) + assert.Equal(t, tt.node, node) } }) } @@ -243,12 +247,15 @@ func TestRunFilterUntilPassingNode(t *testing.T) { err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) - nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) + node, state, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) if tc.expectError { + assert.Nil(t, node) + assert.Nil(t, state) assert.Error(t, err) } else { assert.NoError(t, err) - assert.Contains(t, tc.expectedNodes, nodeName) + assert.NotNil(t, state) + assert.Contains(t, tc.expectedNodes, node.Name) } }) } @@ -278,7 +285,7 @@ func TestDebugInfo(t *testing.T) { err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) - predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1") + _, _, predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1") assert.NotNil(t, predicateErr) assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") @@ -308,7 +315,7 @@ func TestDebugInfo(t *testing.T) { err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) - predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1") + _, _, predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1") assert.Nil(t, predicateErr) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index 7b119b077730..8e9c39c20074 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -74,7 +74,7 @@ func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error { // SchedulePod adds pod to the snapshot and schedules it to given node. func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { + if _, _, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { return schedErr } if err := s.ForceAddPod(pod, nodeName); err != nil { @@ -85,14 +85,14 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster // SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function. func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - nodeName, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) + node, _, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) if schedErr != nil { return "", schedErr } - if err := s.ForceAddPod(pod, nodeName); err != nil { + if err := s.ForceAddPod(pod, node.Name); err != nil { return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error()) } - return nodeName, nil + return node.Name, nil } // UnschedulePod removes the given Pod from the given Node inside the snapshot. @@ -102,5 +102,6 @@ func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, node // CheckPredicates checks whether scheduler predicates pass for the given pod on the given node. func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - return s.pluginRunner.RunFiltersOnNode(pod, nodeName) + _, _, err := s.pluginRunner.RunFiltersOnNode(pod, nodeName) + return err }