Skip to content

Commit

Permalink
Append autopilot anti-affinities to existing matchExpressions array
Browse files Browse the repository at this point in the history
Fixes #259.

Fix the actual bug and also add unit tests to cover all PodSpecTemplate
injections by the appwrapper controller.
  • Loading branch information
dgrove-oss committed Oct 25, 2024
1 parent 20de4e7 commit 95c0a78
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 17 deletions.
88 changes: 87 additions & 1 deletion internal/controller/appwrapper/appwrapper_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -63,6 +64,8 @@ var _ = Describe("AppWrapper Controller", func() {
awConfig.FaultTolerance.RetryPausePeriod = 0 * time.Second
awConfig.FaultTolerance.RetryLimit = 0
awConfig.FaultTolerance.SuccessTTL = 0 * time.Second
awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"] = append(awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"], v1.Taint{Key: "extra", Value: "test", Effect: v1.TaintEffectNoExecute})

awReconciler = &AppWrapperReconciler{
Client: k8sClient,
Recorder: &record.FakeRecorder{},
Expand Down Expand Up @@ -156,6 +159,42 @@ var _ = Describe("AppWrapper Controller", func() {
Expect(finished).Should(BeFalse())
}

validateMarkers := func(p *v1.Pod) {
for k, v := range markerPodSet.Annotations {
Expect(p.Annotations).Should(HaveKeyWithValue(k, v))
}
for k, v := range markerPodSet.Labels {
Expect(p.Labels).Should(HaveKeyWithValue(k, v))
}
for _, v := range markerPodSet.Tolerations {
Expect(p.Spec.Tolerations).Should(ContainElement(v))
}
for k, v := range markerPodSet.NodeSelector {
Expect(p.Spec.NodeSelector).Should(HaveKeyWithValue(k, v))
}
}

validateAutopilot := func(p *v1.Pod) {
if p.Spec.Containers[0].Resources.Requests.Name("nvidia.com/gpu", resource.DecimalSI).IsZero() {
Expect(p.Spec.Affinity).Should(BeNil())
} else {
Expect(p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ShouldNot(BeNil())
Expect(p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms).Should(HaveLen(1))
mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions
for _, taint := range awReconciler.Config.Autopilot.ResourceTaints["nvidia.com/gpu"] {
found := false
for _, me := range mes {
if me.Key == taint.Key {
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
Expect(me.Values).Should(ContainElement(taint.Value))
found = true
}
}
Expect(found).Should(BeTrue())
}
}
}

AfterEach(func() {
By("Cleanup the AppWrapper and ensure no Pods remain")
aw := &workloadv1beta2.AppWrapper{}
Expand Down Expand Up @@ -318,6 +357,54 @@ var _ = Describe("AppWrapper Controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(podStatus.pending).Should(Equal(int32(1)))
})

It("Validating PodSet Injection invariants on minimal pods", func() {
advanceToResuming(pod(100, 0, false), pod(100, 1, true))
beginRunning()
aw := getAppWrapper(awName)
pods := getPods(aw)
Expect(pods).Should(HaveLen(2))

By("Validate expected markers and Autopilot anti-affinities were injected")
for _, p := range pods {
Expect(p.Labels).Should(HaveKeyWithValue(AppWrapperLabel, awName.Name))
validateMarkers(&p)
validateAutopilot(&p)
}
})

It("Validating PodSet Injection invariants on complex pods", func() {
advanceToResuming(complexPodYaml(), complexPodYaml())
beginRunning()
aw := getAppWrapper(awName)
pods := getPods(aw)
Expect(pods).Should(HaveLen(2))

By("Validate expected markers and Autopilot anti-affinities were injected")
for _, p := range pods {
Expect(p.Labels).Should(HaveKeyWithValue(AppWrapperLabel, awName.Name))
validateMarkers(&p)
validateAutopilot(&p)
}

By("Validate complex pod elements were not removed")
for _, p := range pods {
Expect(p.Labels).Should(HaveKeyWithValue("myComplexLabel", "myComplexValue"))
Expect(p.Annotations).Should(HaveKeyWithValue("myComplexAnnotation", "myComplexValue"))
Expect(p.Spec.NodeSelector).Should(HaveKeyWithValue("myComplexSelector", "myComplexValue"))
Expect(p.Spec.Tolerations).Should(ContainElement(v1.Toleration{Key: "myComplexKey", Value: "myComplexValue", Operator: v1.TolerationOpEqual, Effect: v1.TaintEffectNoSchedule}))
mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions
found := false
for _, me := range mes {
if me.Key == "kubernetes.io/hostname" {
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
Expect(me.Values).Should(ContainElement("badHost1"))
found = true
}
}
Expect(found).Should(BeTrue())
}
})
})

var _ = Describe("AppWrapper Annotations", func() {
Expand Down Expand Up @@ -433,5 +520,4 @@ var _ = Describe("AppWrapper Annotations", func() {
Expect(awReconciler.terminalExitCodes(ctx, aw)).Should(Equal([]int{3, 10, 42}))
Expect(awReconciler.retryableExitCodes(ctx, aw)).Should(Equal([]int{10, 20}))
})

})
61 changes: 61 additions & 0 deletions internal/controller/appwrapper/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ func getNode(name string) *v1.Node {
return node
}

func getPods(aw *workloadv1beta2.AppWrapper) []v1.Pod {
result := []v1.Pod{}
podList := &v1.PodList{}
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: aw.Namespace})
Expect(err).NotTo(HaveOccurred())
for _, pod := range podList.Items {
if awn, found := pod.Labels[AppWrapperLabel]; found && awn == aw.Name {
result = append(result, pod)
}
}
return result
}

// envTest doesn't have a Pod controller; so simulate it
func setPodStatus(aw *workloadv1beta2.AppWrapper, phase v1.PodPhase, numToChange int32) error {
podList := &v1.PodList{}
Expand Down Expand Up @@ -128,6 +141,54 @@ func pod(milliCPU int64, numGPU int64, declarePodSets bool) workloadv1beta2.AppW
return *awc
}

const complexPodYAML = `
apiVersion: v1
kind: Pod
metadata:
name: %v
labels:
myComplexLabel: myComplexValue
annotations:
myComplexAnnotation: myComplexValue
spec:
restartPolicy: Never
nodeSelector:
myComplexSelector: myComplexValue
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: NotIn
values:
- badHost1
tolerations:
- key: myComplexKey
value: myComplexValue
operator: Equal
effect: NoSchedule
containers:
- name: busybox
image: quay.io/project-codeflare/busybox:1.36
command: ["sh", "-c", "sleep 10"]
resources:
requests:
cpu: 100m
nvidia.com/gpu: 1
limits:
nvidia.com/gpu: 1`

func complexPodYaml() workloadv1beta2.AppWrapperComponent {
yamlString := fmt.Sprintf(complexPodYAML, randName("pod"))
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
awc := &workloadv1beta2.AppWrapperComponent{
Template: runtime.RawExtension{Raw: jsonBytes},
}
return *awc
}

const malformedPodYAML = `
apiVersion: v1
kind: Pod
Expand Down
43 changes: 27 additions & 16 deletions internal/controller/appwrapper/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func hasResourceRequest(spec map[string]interface{}, resource string) bool {
return false
}

func addNodeSelectorsToAffinity(spec map[string]interface{}, selectorTerms []v1.NodeSelectorTerm) error {
func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.NodeSelectorRequirement) error {
if _, ok := spec["affinity"]; !ok {
spec["affinity"] = map[string]interface{}{}
}
Expand All @@ -131,24 +131,37 @@ func addNodeSelectorsToAffinity(spec map[string]interface{}, selectorTerms []v1.
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution is not a map")
}
if _, ok := nodeSelector["nodeSelectorTerms"]; !ok {
nodeSelector["nodeSelectorTerms"] = []interface{}{}
nodeSelector["nodeSelectorTerms"] = []interface{}{map[string]interface{}{}}
}
existingTerms, ok := nodeSelector["nodeSelectorTerms"].([]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms is not an array")
}
for _, termToAdd := range selectorTerms {
bytes, err := json.Marshal(termToAdd)
if err != nil {
return fmt.Errorf("marshalling selectorTerm %v: %w", termToAdd, err)
for idx, term := range existingTerms {
selTerm, ok := term.(map[string]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v] is not an map", idx)
}
var obj interface{}
if err = json.Unmarshal(bytes, &obj); err != nil {
return fmt.Errorf("unmarshalling selectorTerm %v: %w", termToAdd, err)
if _, ok := selTerm["matchExpressions"]; !ok {
selTerm["matchExpressions"] = []interface{}{}
}
matchExpressions, ok := selTerm["matchExpressions"].([]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v].matchExpressions is not an map", idx)
}
for _, expr := range exprsToAdd {
bytes, err := json.Marshal(expr)
if err != nil {
return fmt.Errorf("marshalling selectorTerm %v: %w", expr, err)
}
var obj interface{}
if err = json.Unmarshal(bytes, &obj); err != nil {
return fmt.Errorf("unmarshalling selectorTerm %v: %w", expr, err)
}
matchExpressions = append(matchExpressions, obj)
}
existingTerms = append(existingTerms, obj)
selTerm["matchExpressions"] = matchExpressions
}
nodeSelector["nodeSelectorTerms"] = existingTerms

return nil
}
Expand Down Expand Up @@ -262,13 +275,11 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
}
}
if len(toAdd) > 0 {
nodeSelectors := []v1.NodeSelectorTerm{}
matchExpressions := []v1.NodeSelectorRequirement{}
for k, v := range toAdd {
nodeSelectors = append(nodeSelectors, v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v}},
})
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v})
}
if err := addNodeSelectorsToAffinity(spec, nodeSelectors); err != nil {
if err := addNodeSelectorsToAffinity(spec, matchExpressions); err != nil {
log.FromContext(ctx).Error(err, "failed to inject Autopilot affinities")
}
}
Expand Down

0 comments on commit 95c0a78

Please sign in to comment.