Skip to content

Commit

Permalink
Redesign node monitoring to account for Node deletion
Browse files Browse the repository at this point in the history
1. Split node monitoring into two reconcilers, one to monitor Nodes
   and one to monitor and update the designated slack ClusterQueue.
2. Remove entries from in memory caches when a Node is deleted.
3. Watch slack cluster queue to be able to react to changes in
   nominalQuotas and adjust lendingLimits accordingly.

Fixes #252.
  • Loading branch information
dgrove-oss committed Oct 15, 2024
1 parent 4b282d0 commit d293826
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 96 deletions.
4 changes: 2 additions & 2 deletions internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
if pod.DeletionTimestamp.IsZero() {
summary.running += 1
if checkNoExecuteNodes {
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
nodeInfoMutex.RLock() // BEGIN CRITICAL SECTION
if len(noExecuteNodes) > 0 {
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
for badResource := range resources {
Expand All @@ -584,7 +584,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
}
}
}
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
nodeInfoMutex.RUnlock() // END CRITICAL SECTION
}
}
case v1.PodSucceeded:
Expand Down
176 changes: 82 additions & 94 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ package appwrapper

import (
"context"
"maps"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"

"github.com/project-codeflare/appwrapper/pkg/config"
)
Expand All @@ -44,51 +44,78 @@ import (
type NodeHealthMonitor struct {
client.Client
Config *config.AppWrapperConfig
Events chan event.GenericEvent // event channel for NodeHealthMonitor to trigger SlackClusterQueueMonitor
}

var (
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
noExecuteNodes = make(map[string]sets.Set[string])
noExecuteNodesMutex sync.RWMutex
// nodeInfoMutex syncnornized writes by NodeHealthMonitor with reads from AppWrapperReconciler and SlackClusterQueueMonitor
nodeInfoMutex sync.RWMutex

// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
noExecuteNodes = make(map[string]sets.Set[string])

// noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources.
// A resource may be unscheduable either because:
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
// (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
noScheduleNodes = make(map[string]v1.ResourceList)
)

// permission to watch nodes
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch

func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
return ctrl.Result{}, nil
if errors.IsNotFound(err) {
r.updateForNodeDeletion(ctx, req.Name)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

r.updateNoExecuteNodes(ctx, node)

// If there is a slack ClusterQueue, update its lending limits

if r.Config.SlackQueueName == "" {
return ctrl.Result{}, nil
if node.DeletionTimestamp.IsZero() {
r.updateNoExecuteNodes(ctx, node)
r.updateNoScheduleNodes(ctx, node)
} else {
r.updateForNodeDeletion(ctx, req.Name)
}

cq := &kueue.ClusterQueue{}
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil // give up if slack quota is not defined
return ctrl.Result{}, nil
}

// Trigger dispatch by means of "*/*" request
func (r *NodeHealthMonitor) triggerDispatch() {
if r.Config.SlackQueueName != "" {
select {
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: "*", Name: "*"}}}:
default:
// do not block if event is already in channel
}
return ctrl.Result{}, err
}
}

r.updateNoScheduleNodes(ctx, cq, node)

return r.updateLendingLimits(ctx, cq)
// update for the deletion of nodeName
func (r *NodeHealthMonitor) updateForNodeDeletion(ctx context.Context, nodeName string) {
if _, ok := noExecuteNodes[nodeName]; ok {
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
delete(noExecuteNodes, nodeName)
nodeInfoMutex.Unlock() // END CRITICAL SECTION
r.triggerDispatch()
log.FromContext(ctx).Info("Updated NoExecute information due to Node deletion",
"Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
}
if _, ok := noScheduleNodes[nodeName]; ok {
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
delete(noScheduleNodes, nodeName)
nodeInfoMutex.Unlock() // END CRITICAL SECTION
r.triggerDispatch()
log.FromContext(ctx).Info("Updated NoSchedule information due to Node deletion",
"Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
}
}

// update noExecuteNodes entry for node
func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
noExecuteResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
Expand All @@ -102,7 +129,7 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
}

noExecuteNodesChanged := false
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
if len(noExecuteResources) == 0 {
delete(noExecuteNodes, node.GetName())
Expand All @@ -115,95 +142,56 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
noExecuteNodes[node.GetName()] = noExecuteResources
noExecuteNodesChanged = true
}
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
nodeInfoMutex.Unlock() // END CRITICAL SECTION

// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
// and the controller runtime is configured to not allow concurrent execution of this controller.
if noExecuteNodesChanged {
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
r.triggerDispatch()
log.FromContext(ctx).Info("Updated NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
}
}

func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
// update unschedulable resource quantities for this node
noScheduleQuantities := make(map[string]*resource.Quantity)
// update noScheduleNodes entry for node
func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.Node) {
var noScheduleResources v1.ResourceList
if node.Spec.Unschedulable {
// add all non-pod resources covered by cq if the node is cordoned
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
if string(resourceName.Name) != "pods" {
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
}
}
noScheduleResources = node.Status.Capacity.DeepCopy()
delete(noScheduleResources, v1.ResourcePods)
} else {
noScheduleResources = make(v1.ResourceList)
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value {
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
if !quantity.IsZero() {
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
}
}
}
}
}
}

if len(noScheduleQuantities) > 0 {
noScheduleNodes[node.GetName()] = noScheduleQuantities
} else {
delete(noScheduleNodes, node.GetName())
}
}

func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {

// compute unschedulable resource totals
unschedulableQuantities := map[string]*resource.Quantity{}
for _, quantities := range noScheduleNodes {
for resourceName, quantity := range quantities {
if !quantity.IsZero() {
if unschedulableQuantities[resourceName] == nil {
unschedulableQuantities[resourceName] = ptr.To(*quantity)
} else {
unschedulableQuantities[resourceName].Add(*quantity)
}
}
noScheduleNodesChanged := false
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := noScheduleNodes[node.GetName()]; ok {
if len(noScheduleResources) == 0 {
delete(noScheduleNodes, node.GetName())
noScheduleNodesChanged = true
} else if !maps.Equal(priorEntry, noScheduleResources) {
noScheduleNodes[node.GetName()] = noScheduleResources
noScheduleNodesChanged = true
}
} else if len(noScheduleResources) > 0 {
noScheduleNodes[node.GetName()] = noScheduleResources
noScheduleNodesChanged = true
}
nodeInfoMutex.Unlock() // END CRITICAL SECTION

// enforce lending limits on 1st flavor of 1st resource group
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources
limitsChanged := false
for i, quota := range resources {
var lendingLimit *resource.Quantity
if unschedulableQuantity := unschedulableQuantities[quota.Name.String()]; unschedulableQuantity != nil {
if quota.NominalQuota.Cmp(*unschedulableQuantity) > 0 {
lendingLimit = ptr.To(quota.NominalQuota)
lendingLimit.Sub(*unschedulableQuantity)
} else {
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
}
}
if quota.LendingLimit == nil && lendingLimit != nil ||
quota.LendingLimit != nil && lendingLimit == nil ||
quota.LendingLimit != nil && lendingLimit != nil && quota.LendingLimit.Cmp(*lendingLimit) != 0 {
limitsChanged = true
resources[i].LendingLimit = lendingLimit
}
}

// update lending limits
if limitsChanged {
err := r.Update(ctx, cq)
if err == nil {
log.FromContext(ctx).Info("Updated lending limits", "Resources", resources)
return ctrl.Result{}, nil
} else if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
return ctrl.Result{}, err
}
if noScheduleNodesChanged {
r.triggerDispatch()
log.FromContext(ctx).Info("Updated NoSchedule information", "Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
30 changes: 30 additions & 0 deletions internal/controller/appwrapper/node_health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var _ = Describe("NodeMonitor Controller", func() {
var slackQueueName = "fake-queue"
var node1Name = types.NamespacedName{Name: "fake-node-1"}
var node2Name = types.NamespacedName{Name: "fake-node-2"}
var dispatch = types.NamespacedName{Name: "*"}
var nodeMonitor *NodeHealthMonitor
var cqMonitor *SlackClusterQueueMonitor
nodeGPUs := v1.ResourceList{v1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}

BeforeEach(func() {
Expand All @@ -49,9 +52,16 @@ var _ = Describe("NodeMonitor Controller", func() {
// Create reconciller
awConfig := config.NewAppWrapperConfig()
awConfig.SlackQueueName = slackQueueName
conduit := make(chan event.GenericEvent, 1)
nodeMonitor = &NodeHealthMonitor{
Client: k8sClient,
Config: awConfig,
Events: conduit,
}
cqMonitor = &SlackClusterQueueMonitor{
Client: k8sClient,
Config: awConfig,
Events: conduit,
}
})

Expand Down Expand Up @@ -124,6 +134,8 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))
Expand All @@ -134,6 +146,8 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
Expand All @@ -144,6 +158,8 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
Expand All @@ -154,6 +170,8 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).Should(BeNil())
Expand All @@ -164,10 +182,22 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))

// Increase the slack cluster queue's quota by 2 and expect LedningLimit to increase by 2 to become 4
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("8")
Expect(k8sClient.Update(ctx, queue)).Should(Succeed())
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: slackQueueName}})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(4)))

Expect(k8sClient.Delete(ctx, queue)).To(Succeed())
})
})
Loading

0 comments on commit d293826

Please sign in to comment.