Skip to content

Commit

Permalink
TAS: Use TAS implicitly when CQ is TAS-only
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Mar 7, 2025
1 parent 2022332 commit 4d7afb2
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 10 deletions.
13 changes: 12 additions & 1 deletion pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
removeUsage(c, fr, q)
}
}
if features.Enabled(features.TopologyAwareScheduling) && wi.IsRequestingTAS() {
if features.Enabled(features.TopologyAwareScheduling) && wi.IsUsingTAS() {
for tasFlavor, tasUsage := range wi.TASUsage() {
if tasFlvCache := c.tasFlavorCache(tasFlavor); tasFlvCache != nil {
if m == 1 {
Expand Down Expand Up @@ -627,3 +627,14 @@ func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool {
func (c *clusterQueue) fairWeight() *resource.Quantity {
return &c.FairWeight
}

func (c *clusterQueue) isTASOnly() bool {
for _, rg := range c.ResourceGroups {
for _, fName := range rg.Flavors {
if _, found := c.tasFlavors[fName]; !found {
return false
}
}
}
return true
}
5 changes: 5 additions & 0 deletions pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type ClusterQueueSnapshot struct {
hierarchy.ClusterQueue[*CohortSnapshot]

TASFlavors map[kueue.ResourceFlavorReference]*TASFlavorSnapshot
tasOnly bool
}

// RGByResource returns the ResourceGroup which contains capacity
Expand Down Expand Up @@ -214,3 +215,7 @@ func (c *ClusterQueueSnapshot) FindTopologyAssignmentsForWorkload(
}
return result
}

func (c *ClusterQueueSnapshot) IsTASOnly() bool {
return c.tasOnly
}
1 change: 1 addition & 0 deletions pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func snapshotClusterQueue(c *clusterQueue) *ClusterQueueSnapshot {
AdmissionChecks: utilmaps.DeepCopySets[kueue.ResourceFlavorReference](c.AdmissionChecks),
ResourceNode: c.resourceNode.Clone(),
TASFlavors: make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot),
tasOnly: c.isTASOnly(),
}
for i, rg := range c.ResourceGroups {
cc.ResourceGroups[i] = rg.Clone()
Expand Down
30 changes: 26 additions & 4 deletions pkg/cache/tas_flavor_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
Expand Down Expand Up @@ -157,7 +158,11 @@ func (s *TASFlavorSnapshot) addNode(node corev1.Node) utiltas.TopologyDomainID {
}

func (s *TASFlavorSnapshot) isLowestLevelNode() bool {
return s.levelKeys[len(s.levelKeys)-1] == corev1.LabelHostname
return s.lowestLevel() == corev1.LabelHostname
}

func (s *TASFlavorSnapshot) lowestLevel() string {
return s.levelKeys[len(s.levelKeys)-1]
}

// initialize prepares the topology tree structure. This structure holds
Expand Down Expand Up @@ -249,6 +254,7 @@ type TASPodSetRequests struct {
SinglePodRequests resources.Requests
Count int32
Flavor kueue.ResourceFlavorReference
Implied bool
}

func (t *TASPodSetRequests) TotalRequests() resources.Requests {
Expand Down Expand Up @@ -340,13 +346,12 @@ func (s *TASFlavorSnapshot) findTopologyAssignment(
tasPodSetRequests TASPodSetRequests,
assumedUsage map[utiltas.TopologyDomainID]resources.Requests,
simulateEmpty bool) (*kueue.TopologyAssignment, string) {
topologyRequest := tasPodSetRequests.PodSet.TopologyRequest
requests := tasPodSetRequests.SinglePodRequests.Clone()
requests.Add(resources.Requests{corev1.ResourcePods: 1})
podSetTolerations := tasPodSetRequests.PodSet.Template.Spec.Tolerations
count := tasPodSetRequests.Count
required := topologyRequest.Required != nil
key := levelKey(topologyRequest)
required := isRequired(tasPodSetRequests.PodSet.TopologyRequest)
key := s.levelKeyWithImpliedFallback(&tasPodSetRequests)
if key == nil {
return nil, "topology level not specified"
}
Expand Down Expand Up @@ -392,7 +397,24 @@ func (s *TASFlavorSnapshot) resolveLevelIdx(levelKey string) (int, bool) {
return levelIdx, true
}

func isRequired(tr *kueue.PodSetTopologyRequest) bool {
return tr != nil && tr.Required != nil
}

func (s *TASFlavorSnapshot) levelKeyWithImpliedFallback(tasRequests *TASPodSetRequests) *string {
if key := levelKey(tasRequests.PodSet.TopologyRequest); key != nil {
return key
}
if tasRequests.Implied {
return ptr.To(s.lowestLevel())
}
return nil
}

func levelKey(topologyRequest *kueue.PodSetTopologyRequest) *string {
if topologyRequest == nil {
return nil
}
if topologyRequest.Required != nil {
return topologyRequest.Required
} else if topologyRequest.Preferred != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/scheduler/flavorassigner/tas_flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ import (

// WorkloadsTopologyRequests - returns the TopologyRequests of the workload
func (a *Assignment) WorkloadsTopologyRequests(wl *workload.Info, cq *cache.ClusterQueueSnapshot) cache.WorkloadTASRequests {
isTASOnly := cq.IsTASOnly()
tasRequests := make(cache.WorkloadTASRequests)
for i, podSet := range wl.Obj.Spec.PodSets {
if podSet.TopologyRequest != nil {
if podSet.TopologyRequest != nil || isTASOnly {
psAssignment := a.podSetAssignmentByName(podSet.Name)
if psAssignment.Status.IsError() {
// There is no resource quota assignment for the PodSet - no need to check TAS.
continue
}
psTASRequest, err := podSetTopologyRequest(psAssignment, wl, cq, i)
isTASImplied := podSet.TopologyRequest == nil && isTASOnly
psTASRequest, err := podSetTopologyRequest(psAssignment, wl, cq, isTASImplied, i)
if err != nil {
psAssignment.error(err)
} else {
Expand All @@ -51,6 +53,7 @@ func (a *Assignment) WorkloadsTopologyRequests(wl *workload.Info, cq *cache.Clus
func podSetTopologyRequest(psAssignment *PodSetAssignment,
wl *workload.Info,
cq *cache.ClusterQueueSnapshot,
isTASImplied bool,
podSetIndex int) (*cache.TASPodSetRequests, error) {
if len(cq.TASFlavors) == 0 {
return nil, errors.New("workload requires Topology, but there is no TAS cache information")
Expand All @@ -71,6 +74,7 @@ func podSetTopologyRequest(psAssignment *PodSetAssignment,
SinglePodRequests: singlePodRequests,
PodSet: podSet,
Flavor: *tasFlvr,
Implied: isTASImplied,
}, nil
}

Expand Down Expand Up @@ -107,6 +111,10 @@ func checkPodSetAndFlavorMatchForTAS(cq *cache.ClusterQueueSnapshot, ps *kueue.P
return ptr.To(fmt.Sprintf("Flavor %q does not contain the requested level", flavor.Name))
}
}
// If this is a TAS-only CQ, then no TopologyRequest is ok
if ps.TopologyRequest == nil && cq.IsTASOnly() {
return nil
}
// For PodSets which don't use TAS skip resource flavors which are only for TAS
if ps.TopologyRequest == nil && flavor.Spec.TopologyName != nil {
return ptr.To(fmt.Sprintf("Flavor %q supports only TopologyAwareScheduling", flavor.Name))
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (s *Scheduler) getInitialAssignments(log logr.Logger, wl *workload.Info, sn
}

func updateAssignmentForTAS(cq *cache.ClusterQueueSnapshot, wl *workload.Info, assignment flavorassigner.Assignment, targets []*preemption.Target) {
if features.Enabled(features.TopologyAwareScheduling) && assignment.RepresentativeMode() == flavorassigner.Preempt && wl.IsRequestingTAS() {
if features.Enabled(features.TopologyAwareScheduling) && assignment.RepresentativeMode() == flavorassigner.Preempt && (wl.IsRequestingTAS() || cq.IsTASOnly()) {
tasRequests := assignment.WorkloadsTopologyRequests(wl, cq)
var tasResult cache.TASAssignmentsResult
if len(targets) > 0 {
Expand Down
49 changes: 49 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4285,6 +4285,55 @@ func TestScheduleForTAS(t *testing.T) {
// eventCmpOpts are the comparison options for the events
eventCmpOpts []cmp.Option
}{
"workload which does not specify TAS annotation uses the only TAS flavor": {
nodes: defaultSingleNode,
topologies: []kueuealpha.Topology{defaultSingleLevelTopology},
resourceFlavors: []kueue.ResourceFlavor{defaultTASFlavor, defaultFlavor},
clusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("tas-main").
ResourceGroup(
*utiltesting.MakeFlavorQuotas("tas-default").
Resource(corev1.ResourceCPU, "50").Obj()).
Obj(),
},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "default").
Queue("tas-main").
PodSets(*utiltesting.MakePodSet("one", 1).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
},
wantNewAssignments: map[string]kueue.Admission{
"default/foo": *utiltesting.MakeAdmission("tas-main", "one").
Assignment(corev1.ResourceCPU, "tas-default", "1000m").
AssignmentPodCount(1).
TopologyAssignment(&kueue.TopologyAssignment{
Levels: utiltas.Levels(&defaultSingleLevelTopology),
Domains: []kueue.TopologyDomainAssignment{
{
Count: 1,
Values: []string{
"x1",
},
},
},
}).Obj(),
},
eventCmpOpts: []cmp.Option{eventIgnoreMessage},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "default", Name: "foo"},
Reason: "QuotaReserved",
EventType: corev1.EventTypeNormal,
},
{
Key: types.NamespacedName{Namespace: "default", Name: "foo"},
Reason: "Admitted",
EventType: corev1.EventTypeNormal,
},
},
},
"workload requiring TAS skips the non-TAS flavor": {
nodes: defaultSingleNode,
topologies: []kueuealpha.Topology{defaultSingleLevelTopology},
Expand Down
12 changes: 10 additions & 2 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,15 @@ func dropExcludedResources(input corev1.ResourceList, excludedPrefixes []string)
return res
}

// IsRequestingTAS returns information if the workload is requesting TAS
// IsUsingTAS returns information if the workload is requesting TAS
func (i *Info) IsUsingTAS() bool {
return slices.ContainsFunc(i.TotalRequests,
func(ps PodSetResources) bool {
return ps.TopologyRequest != nil
})
}

// IsUsingTAS returns information if the workload is requesting TAS
func (i *Info) IsRequestingTAS() bool {
return slices.ContainsFunc(i.Obj.Spec.PodSets,
func(ps kueue.PodSet) bool {
Expand All @@ -291,7 +299,7 @@ func (i *Info) IsRequestingTAS() bool {

// TASUsage returns topology usage requested by the Workload
func (i *Info) TASUsage() TASUsage {
if !features.Enabled(features.TopologyAwareScheduling) || !i.IsRequestingTAS() {
if !features.Enabled(features.TopologyAwareScheduling) || !i.IsUsingTAS() {
return nil
}
result := make(TASUsage, 0)
Expand Down

0 comments on commit 4d7afb2

Please sign in to comment.