diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 79f48ba40c..f524918f69 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -508,7 +508,7 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) { removeUsage(c, fr, q) } } - if features.Enabled(features.TopologyAwareScheduling) && wi.IsRequestingTAS() { + if features.Enabled(features.TopologyAwareScheduling) && wi.IsUsingTAS() { for tasFlavor, tasUsage := range wi.TASUsage() { if tasFlvCache := c.tasFlavorCache(tasFlavor); tasFlvCache != nil { if m == 1 { @@ -627,3 +627,14 @@ func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool { func (c *clusterQueue) fairWeight() *resource.Quantity { return &c.FairWeight } + +func (c *clusterQueue) isTASOnly() bool { + for _, rg := range c.ResourceGroups { + for _, fName := range rg.Flavors { + if _, found := c.tasFlavors[fName]; !found { + return false + } + } + } + return true +} diff --git a/pkg/cache/clusterqueue_snapshot.go b/pkg/cache/clusterqueue_snapshot.go index 54bb07a371..12f76e659e 100644 --- a/pkg/cache/clusterqueue_snapshot.go +++ b/pkg/cache/clusterqueue_snapshot.go @@ -55,6 +55,7 @@ type ClusterQueueSnapshot struct { hierarchy.ClusterQueue[*CohortSnapshot] TASFlavors map[kueue.ResourceFlavorReference]*TASFlavorSnapshot + tasOnly bool } // RGByResource returns the ResourceGroup which contains capacity @@ -214,3 +215,7 @@ func (c *ClusterQueueSnapshot) FindTopologyAssignmentsForWorkload( } return result } + +func (c *ClusterQueueSnapshot) IsTASOnly() bool { + return c.tasOnly +} diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index 660376080a..3d78bd83ca 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -151,6 +151,7 @@ func snapshotClusterQueue(c *clusterQueue) *ClusterQueueSnapshot { AdmissionChecks: utilmaps.DeepCopySets[kueue.ResourceFlavorReference](c.AdmissionChecks), ResourceNode: c.resourceNode.Clone(), TASFlavors: make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot), + tasOnly: c.isTASOnly(), } for i, rg := range c.ResourceGroups { cc.ResourceGroups[i] = rg.Clone() diff --git a/pkg/cache/tas_flavor_snapshot.go b/pkg/cache/tas_flavor_snapshot.go index f361efb08e..3d4a1b6519 100644 --- a/pkg/cache/tas_flavor_snapshot.go +++ b/pkg/cache/tas_flavor_snapshot.go @@ -26,6 +26,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" + "k8s.io/utils/ptr" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" @@ -157,7 +158,11 @@ func (s *TASFlavorSnapshot) addNode(node corev1.Node) utiltas.TopologyDomainID { } func (s *TASFlavorSnapshot) isLowestLevelNode() bool { - return s.levelKeys[len(s.levelKeys)-1] == corev1.LabelHostname + return s.lowestLevel() == corev1.LabelHostname +} + +func (s *TASFlavorSnapshot) lowestLevel() string { + return s.levelKeys[len(s.levelKeys)-1] } // initialize prepares the topology tree structure. This structure holds @@ -249,6 +254,7 @@ type TASPodSetRequests struct { SinglePodRequests resources.Requests Count int32 Flavor kueue.ResourceFlavorReference + Implied bool } func (t *TASPodSetRequests) TotalRequests() resources.Requests { @@ -340,13 +346,12 @@ func (s *TASFlavorSnapshot) findTopologyAssignment( tasPodSetRequests TASPodSetRequests, assumedUsage map[utiltas.TopologyDomainID]resources.Requests, simulateEmpty bool) (*kueue.TopologyAssignment, string) { - topologyRequest := tasPodSetRequests.PodSet.TopologyRequest requests := tasPodSetRequests.SinglePodRequests.Clone() requests.Add(resources.Requests{corev1.ResourcePods: 1}) podSetTolerations := tasPodSetRequests.PodSet.Template.Spec.Tolerations count := tasPodSetRequests.Count - required := topologyRequest.Required != nil - key := levelKey(topologyRequest) + required := isRequired(tasPodSetRequests.PodSet.TopologyRequest) + key := s.levelKeyWithImpliedFallback(&tasPodSetRequests) if key == nil { return nil, "topology level not specified" } @@ -392,7 +397,24 @@ func (s *TASFlavorSnapshot) resolveLevelIdx(levelKey string) (int, bool) { return levelIdx, true } +func isRequired(tr *kueue.PodSetTopologyRequest) bool { + return tr != nil && tr.Required != nil +} + +func (s *TASFlavorSnapshot) levelKeyWithImpliedFallback(tasRequests *TASPodSetRequests) *string { + if key := levelKey(tasRequests.PodSet.TopologyRequest); key != nil { + return key + } + if tasRequests.Implied { + return ptr.To(s.lowestLevel()) + } + return nil +} + func levelKey(topologyRequest *kueue.PodSetTopologyRequest) *string { + if topologyRequest == nil { + return nil + } if topologyRequest.Required != nil { return topologyRequest.Required } else if topologyRequest.Preferred != nil { diff --git a/pkg/scheduler/flavorassigner/tas_flavorassigner.go b/pkg/scheduler/flavorassigner/tas_flavorassigner.go index 45f65be36f..724b7eba35 100644 --- a/pkg/scheduler/flavorassigner/tas_flavorassigner.go +++ b/pkg/scheduler/flavorassigner/tas_flavorassigner.go @@ -29,15 +29,17 @@ import ( // WorkloadsTopologyRequests - returns the TopologyRequests of the workload func (a *Assignment) WorkloadsTopologyRequests(wl *workload.Info, cq *cache.ClusterQueueSnapshot) cache.WorkloadTASRequests { + isTASOnly := cq.IsTASOnly() tasRequests := make(cache.WorkloadTASRequests) for i, podSet := range wl.Obj.Spec.PodSets { - if podSet.TopologyRequest != nil { + if podSet.TopologyRequest != nil || isTASOnly { psAssignment := a.podSetAssignmentByName(podSet.Name) if psAssignment.Status.IsError() { // There is no resource quota assignment for the PodSet - no need to check TAS. continue } - psTASRequest, err := podSetTopologyRequest(psAssignment, wl, cq, i) + isTASImplied := podSet.TopologyRequest == nil && isTASOnly + psTASRequest, err := podSetTopologyRequest(psAssignment, wl, cq, isTASImplied, i) if err != nil { psAssignment.error(err) } else { @@ -51,6 +53,7 @@ func (a *Assignment) WorkloadsTopologyRequests(wl *workload.Info, cq *cache.Clus func podSetTopologyRequest(psAssignment *PodSetAssignment, wl *workload.Info, cq *cache.ClusterQueueSnapshot, + isTASImplied bool, podSetIndex int) (*cache.TASPodSetRequests, error) { if len(cq.TASFlavors) == 0 { return nil, errors.New("workload requires Topology, but there is no TAS cache information") @@ -71,6 +74,7 @@ func podSetTopologyRequest(psAssignment *PodSetAssignment, SinglePodRequests: singlePodRequests, PodSet: podSet, Flavor: *tasFlvr, + Implied: isTASImplied, }, nil } @@ -107,6 +111,10 @@ func checkPodSetAndFlavorMatchForTAS(cq *cache.ClusterQueueSnapshot, ps *kueue.P return ptr.To(fmt.Sprintf("Flavor %q does not contain the requested level", flavor.Name)) } } + // If this is a TAS-only CQ, then no TopologyRequest is ok + if ps.TopologyRequest == nil && cq.IsTASOnly() { + return nil + } // For PodSets which don't use TAS skip resource flavors which are only for TAS if ps.TopologyRequest == nil && flavor.Spec.TopologyName != nil { return ptr.To(fmt.Sprintf("Flavor %q supports only TopologyAwareScheduling", flavor.Name)) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 4264ea3921..594b682e30 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -457,7 +457,7 @@ func (s *Scheduler) getInitialAssignments(log logr.Logger, wl *workload.Info, sn } func updateAssignmentForTAS(cq *cache.ClusterQueueSnapshot, wl *workload.Info, assignment flavorassigner.Assignment, targets []*preemption.Target) { - if features.Enabled(features.TopologyAwareScheduling) && assignment.RepresentativeMode() == flavorassigner.Preempt && wl.IsRequestingTAS() { + if features.Enabled(features.TopologyAwareScheduling) && assignment.RepresentativeMode() == flavorassigner.Preempt && (wl.IsRequestingTAS() || cq.IsTASOnly()) { tasRequests := assignment.WorkloadsTopologyRequests(wl, cq) var tasResult cache.TASAssignmentsResult if len(targets) > 0 { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index c6ddb52ee7..313fba735b 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -4285,6 +4285,55 @@ func TestScheduleForTAS(t *testing.T) { // eventCmpOpts are the comparison options for the events eventCmpOpts []cmp.Option }{ + "workload which does not specify TAS annotation uses the only TAS flavor": { + nodes: defaultSingleNode, + topologies: []kueuealpha.Topology{defaultSingleLevelTopology}, + resourceFlavors: []kueue.ResourceFlavor{defaultTASFlavor, defaultFlavor}, + clusterQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("tas-main"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("tas-default"). + Resource(corev1.ResourceCPU, "50").Obj()). + Obj(), + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("foo", "default"). + Queue("tas-main"). + PodSets(*utiltesting.MakePodSet("one", 1). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + }, + wantNewAssignments: map[string]kueue.Admission{ + "default/foo": *utiltesting.MakeAdmission("tas-main", "one"). + Assignment(corev1.ResourceCPU, "tas-default", "1000m"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: utiltas.Levels(&defaultSingleLevelTopology), + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "x1", + }, + }, + }, + }).Obj(), + }, + eventCmpOpts: []cmp.Option{eventIgnoreMessage}, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "QuotaReserved", + EventType: corev1.EventTypeNormal, + }, + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "Admitted", + EventType: corev1.EventTypeNormal, + }, + }, + }, "workload requiring TAS skips the non-TAS flavor": { nodes: defaultSingleNode, topologies: []kueuealpha.Topology{defaultSingleLevelTopology}, diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 8d146a4a6c..7348fb9d2f 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -281,7 +281,15 @@ func dropExcludedResources(input corev1.ResourceList, excludedPrefixes []string) return res } -// IsRequestingTAS returns information if the workload is requesting TAS +// IsUsingTAS returns information if the workload is requesting TAS +func (i *Info) IsUsingTAS() bool { + return slices.ContainsFunc(i.TotalRequests, + func(ps PodSetResources) bool { + return ps.TopologyRequest != nil + }) +} + +// IsUsingTAS returns information if the workload is requesting TAS func (i *Info) IsRequestingTAS() bool { return slices.ContainsFunc(i.Obj.Spec.PodSets, func(ps kueue.PodSet) bool { @@ -291,7 +299,7 @@ func (i *Info) IsRequestingTAS() bool { // TASUsage returns topology usage requested by the Workload func (i *Info) TASUsage() TASUsage { - if !features.Enabled(features.TopologyAwareScheduling) || !i.IsRequestingTAS() { + if !features.Enabled(features.TopologyAwareScheduling) || !i.IsUsingTAS() { return nil } result := make(TASUsage, 0)