Skip to content

Commit

Permalink
fix: propagate priority-class label for leaderworkerset (#4006)
Browse files Browse the repository at this point in the history
Signed-off-by: Abirdcfly <[email protected]>
  • Loading branch information
Abirdcfly authored Jan 23, 2025
1 parent aba0f91 commit fd2519a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ func (r *PodReconciler) setDefault(ctx context.Context, pod *corev1.Pod) (bool,

pod.Labels[constants.QueueLabel] = queueName
pod.Labels[podcontroller.GroupNameLabel] = GetWorkloadName(lws, pod.Labels[leaderworkersetv1.GroupIndexLabelKey])
if priorityClass := jobframework.WorkloadPriorityClassName(lws); priorityClass != "" {
pod.Labels[constants.WorkloadPriorityClassLabel] = priorityClass
}
pod.Annotations[podcontroller.GroupTotalCountAnnotation] = fmt.Sprint(ptr.Deref(lws.Spec.LeaderWorkerTemplate.Size, 1))

hash, err := utilpod.GenerateShape(pod.Spec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"

"sigs.k8s.io/kueue/pkg/controller/constants"
podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/util/testingjobs/leaderworkerset"
Expand Down Expand Up @@ -121,6 +122,29 @@ func TestPodReconciler(t *testing.T) {
Annotation(podcontroller.RoleHashAnnotation, "7aa6c7b8").
Obj(),
},
"should set default values and priority class when has value": {
lws: leaderworkerset.MakeLeaderWorkerSet("lws", "ns").
Queue("queue").
Label(constants.WorkloadPriorityClassLabel, "test").
Obj(),
pod: testingjobspod.MakePod("pod", "ns").
Label(leaderworkersetv1.SetNameLabelKey, "lws").
Label(leaderworkersetv1.GroupIndexLabelKey, "0").
Annotation(podcontroller.SuspendedByParentAnnotation, FrameworkName).
Annotation(podcontroller.GroupServingAnnotation, "true").
Obj(),
wantPod: testingjobspod.MakePod("pod", "ns").
Label(leaderworkersetv1.SetNameLabelKey, "lws").
Label(leaderworkersetv1.GroupIndexLabelKey, "0").
Label(constants.WorkloadPriorityClassLabel, "test").
Queue("queue").
Group("leaderworkerset-lws-0-97565").
GroupTotalCount("1").
Annotation(podcontroller.SuspendedByParentAnnotation, FrameworkName).
Annotation(podcontroller.GroupServingAnnotation, "true").
Annotation(podcontroller.RoleHashAnnotation, "7aa6c7b8").
Obj(),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var _ webhook.CustomValidator = &Webhook{}
var (
labelsPath = field.NewPath("metadata", "labels")
queueNameLabelPath = labelsPath.Key(constants.QueueLabel)
priorityClassNamePath = labelsPath.Key(constants.WorkloadPriorityClassLabel)
specPath = field.NewPath("spec")
startupPolicyPath = specPath.Child("startupPolicy")
leaderWorkerTemplatePath = specPath.Child("leaderWorkerTemplate")
Expand Down Expand Up @@ -128,6 +129,10 @@ func (wh *Webhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Ob
jobframework.QueueNameForObject(oldLeaderWorkerSet.Object()),
queueNameLabelPath,
)
allErrs = append(allErrs, jobframework.ValidateUpdateForWorkloadPriorityClassName(
newLeaderWorkerSet.Object(),
oldLeaderWorkerSet.Object(),
)...)

if jobframework.IsManagedByKueue(newLeaderWorkerSet.Object()) {
allErrs = append(allErrs, validateStartupPolicy(newLeaderWorkerSet)...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/appwrapper"
podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod"
Expand Down Expand Up @@ -244,6 +245,24 @@ func TestValidateUpdate(t *testing.T) {
},
}.ToAggregate(),
},
"change priority class": {
oldObj: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
LeaderTemplate(corev1.PodTemplateSpec{}).
Queue("test-queue").
Label(constants.WorkloadPriorityClassLabel, "test").
Obj(),
newObj: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
LeaderTemplate(corev1.PodTemplateSpec{}).
Queue("test-queue").
Label(constants.WorkloadPriorityClassLabel, "new-test").
Obj(),
wantErr: field.ErrorList{
&field.Error{
Type: field.ErrorTypeInvalid,
Field: priorityClassNamePath.String(),
},
}.ToAggregate(),
},
"change startup policy without queue-name": {
oldObj: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
StartupPolicy(leaderworkersetv1.LeaderCreatedStartupPolicy).
Expand Down

0 comments on commit fd2519a

Please sign in to comment.