Skip to content

Commit

Permalink
reorganize NodeMonitor code; no semantic change
Browse files Browse the repository at this point in the history
Rename variables, fix comments, and refactor into smaller methods
to prepare for actually fixing the bugs related to node deletion.
  • Loading branch information
dgrove-oss committed Oct 14, 2024
1 parent 69b0199 commit 8c69451
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 77 deletions.
34 changes: 17 additions & 17 deletions internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type podStatusSummary struct {
succeeded int32
failed int32
terminalFailure bool
unhealthyNodes sets.Set[string]
noExecuteNodes sets.Set[string]
}

type componentStatusSummary struct {
Expand Down Expand Up @@ -334,13 +334,13 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}

// Initiate migration of workloads that are using resources that Autopilot has flagged as unhealthy
detailMsg = fmt.Sprintf("Workload contains pods using unhealthy resources on Nodes: %v", podStatus.unhealthyNodes)
if len(podStatus.unhealthyNodes) > 0 {
// Initiate migration of workloads that are using resources that Autopilot has flagged as NoExecute
detailMsg = fmt.Sprintf("Workload contains pods using NoExecute resources on Nodes: %v", podStatus.noExecuteNodes)
if len(podStatus.noExecuteNodes) > 0 {
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
Type: string(workloadv1beta2.Unhealthy),
Status: metav1.ConditionTrue,
Reason: "AutopilotUnhealthy",
Reason: "AutopilotNoExecute",
Message: detailMsg,
})
r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), detailMsg)
Expand Down Expand Up @@ -549,7 +549,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
return nil, err
}
summary := &podStatusSummary{expected: pc}
checkUnhealthyNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes
checkNoExecuteNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes

for _, pod := range pods.Items {
switch pod.Status.Phase {
Expand All @@ -558,33 +558,33 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
case v1.PodRunning:
if pod.DeletionTimestamp.IsZero() {
summary.running += 1
if checkUnhealthyNodes {
unhealthyNodesMutex.RLock() // BEGIN CRITICAL SECTION
if len(unhealthyNodes) > 0 {
if resources, ok := unhealthyNodes[pod.Spec.NodeName]; ok {
if checkNoExecuteNodes {
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
if len(noExecuteNodes) > 0 {
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
for badResource := range resources {
for _, container := range pod.Spec.Containers {
if limit, ok := container.Resources.Limits[v1.ResourceName(badResource)]; ok {
if !limit.IsZero() {
if summary.unhealthyNodes == nil {
summary.unhealthyNodes = make(sets.Set[string])
if summary.noExecuteNodes == nil {
summary.noExecuteNodes = make(sets.Set[string])
}
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
summary.noExecuteNodes.Insert(pod.Spec.NodeName)
}
}
if request, ok := container.Resources.Requests[v1.ResourceName(badResource)]; ok {
if !request.IsZero() {
if summary.unhealthyNodes == nil {
summary.unhealthyNodes = make(sets.Set[string])
if summary.noExecuteNodes == nil {
summary.noExecuteNodes = make(sets.Set[string])
}
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
summary.noExecuteNodes.Insert(pod.Spec.NodeName)
}
}
}
}
}
}
unhealthyNodesMutex.RUnlock() // END CRITICAL SECTION
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
}
}
case v1.PodSucceeded:
Expand Down
120 changes: 68 additions & 52 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,74 +36,46 @@ import (
"github.com/project-codeflare/appwrapper/pkg/config"
)

// NodeHealthMonitor maintains the set of nodes that Autopilot has labelled as unhealthy
// NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either
// been marked as Unschedulable or that have been labeled to indicate that
// they have resources that Autopilot has tainted as NoSchedule or NoExeucte.
// This information is used to automate the maintenance of the lendingLimit of
// a designated slack ClusterQueue and to migrate running workloads away from NoExecute resources.
type NodeHealthMonitor struct {
client.Client
Config *config.AppWrapperConfig
}

var (
// unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
unhealthyNodes = make(map[string]sets.Set[string])
unhealthyNodesMutex sync.RWMutex

// unschedulableNodes is a mapping from Node names to resource quantities than Autopilot has labeled as unschedulable on that Node
unschedulableNodes = make(map[string]map[string]*resource.Quantity)
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
noExecuteNodes = make(map[string]sets.Set[string])
noExecuteNodesMutex sync.RWMutex

// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
// A resource may be unscheduable either because:
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
)

// permission to watch nodes
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch

//gocyclo:ignore
func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
return ctrl.Result{}, nil
}

flaggedResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute {
flaggedResources.Insert(resourceName)
}
}
}
}

nodeChanged := false
unhealthyNodesMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := unhealthyNodes[node.GetName()]; ok {
if len(flaggedResources) == 0 {
delete(unhealthyNodes, node.GetName())
nodeChanged = true
} else if !priorEntry.Equal(flaggedResources) {
unhealthyNodes[node.GetName()] = flaggedResources
nodeChanged = true
}
} else if len(flaggedResources) > 0 {
unhealthyNodes[node.GetName()] = flaggedResources
nodeChanged = true
}
unhealthyNodesMutex.Unlock() // END CRITICAL SECTION
r.updateNoExecuteNodes(ctx, node)

// Unsynchronized reads of unhealthyNodes below are safe because this method
// is the only writer to the map and the controller runtime is configured to
// not allow concurrent execution of this method.

if nodeChanged {
log.FromContext(ctx).Info("Updated node health information", "Number Unhealthy Nodes", len(unhealthyNodes), "Unhealthy Resource Details", unhealthyNodes)
}

// update lending limits on slack quota if configured
// If there is a slack ClusterQueue, update its lending limits

if r.Config.SlackQueueName == "" {
return ctrl.Result{}, nil
}

// get slack quota
cq := &kueue.ClusterQueue{}
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
if errors.IsNotFound(err) {
Expand All @@ -112,36 +84,80 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}

r.updateNoScheduleNodes(ctx, cq, node)

return r.updateLendingLimits(ctx, cq)
}

func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
noExecuteResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute {
noExecuteResources.Insert(resourceName)
}
}
}
}

noExecuteNodesChanged := false
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
if len(noExecuteResources) == 0 {
delete(noExecuteNodes, node.GetName())
noExecuteNodesChanged = true
} else if !priorEntry.Equal(noExecuteResources) {
noExecuteNodes[node.GetName()] = noExecuteResources
noExecuteNodesChanged = true
}
} else if len(noExecuteResources) > 0 {
noExecuteNodes[node.GetName()] = noExecuteResources
noExecuteNodesChanged = true
}
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION

// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
// and the controller runtime is configured to not allow concurrent execution of this controller.
if noExecuteNodesChanged {
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
}
}

func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
// update unschedulable resource quantities for this node
flaggedQuantities := make(map[string]*resource.Quantity)
noScheduleQuantities := make(map[string]*resource.Quantity)
if node.Spec.Unschedulable {
// flag all non-pod resources covered by cq if the node is cordoned
// add all non-pod resources covered by cq if the node is cordoned
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
if string(resourceName.Name) != "pods" {
flaggedQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
}
}
} else {
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value {
flaggedQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
}
}
}
}
}

if len(flaggedQuantities) > 0 {
unschedulableNodes[node.GetName()] = flaggedQuantities
if len(noScheduleQuantities) > 0 {
noScheduleNodes[node.GetName()] = noScheduleQuantities
} else {
delete(unschedulableNodes, node.GetName())
delete(noScheduleNodes, node.GetName())
}
}

func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {

// compute unschedulable resource totals
unschedulableQuantities := map[string]*resource.Quantity{}
for _, quantities := range unschedulableNodes {
for _, quantities := range noScheduleNodes {
for resourceName, quantity := range quantities {
if !quantity.IsZero() {
if unschedulableQuantities[resourceName] == nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/controller/appwrapper/node_health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(err).NotTo(HaveOccurred())

By("Healthy cluster has no unhealthy nodes")
Expect(len(unhealthyNodes)).Should(Equal(0))
Expect(len(noExecuteNodes)).Should(Equal(0))

By("A node labeled EVICT is detected as unhealthy")
node := getNode(node1Name.Name)
Expand All @@ -84,25 +84,25 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(err).NotTo(HaveOccurred())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
Expect(len(unhealthyNodes)).Should(Equal(1))
Expect(unhealthyNodes).Should(HaveKey(node1Name.Name))
Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
Expect(len(noExecuteNodes)).Should(Equal(1))
Expect(noExecuteNodes).Should(HaveKey(node1Name.Name))
Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))

By("Repeated reconcile does not change map")
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
Expect(len(unhealthyNodes)).Should(Equal(1))
Expect(unhealthyNodes).Should(HaveKey(node1Name.Name))
Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
Expect(len(noExecuteNodes)).Should(Equal(1))
Expect(noExecuteNodes).Should(HaveKey(node1Name.Name))
Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))

By("Removing the EVICT label updates unhealthyNodes")
node.Labels["autopilot.ibm.com/gpuhealth"] = "ERR"
Expect(k8sClient.Update(ctx, node)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
Expect(len(unhealthyNodes)).Should(Equal(0))
Expect(len(noExecuteNodes)).Should(Equal(0))
})

It("ClusterQueue Lending Adjustment", func() {
Expand Down

0 comments on commit 8c69451

Please sign in to comment.