Skip to content

Commit

Permalink
Merge pull request #14 from converged-computing/add-owner-cleanup
Browse files Browse the repository at this point in the history
add owner cleanup
  • Loading branch information
vsoch authored Aug 4, 2024
2 parents 24ed8d2 + 2c098c7 commit 606e18b
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 98 deletions.
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,21 @@ SELECT group_name, group_size from pods_provisional;
- [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth)
- fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol.
- [ ] create state diagram that shows how stuff works
- [ ] When a job is allocated, we likely need to submit a cancel job that will ensure it can be cancelled when the time runs out
- [x] add the label for the job timeout, default to one hour
- [x] cleanup job is triggered after duration
- [ ] issue cancel to fluxion and delete pods up to parent (how to do this)?
- [ ] Decide what to do on events - currently we delete / cleanup when there is a decided timeout for pod/job
- Arguably, we need to respond to these events for services, etc., where a cleanup job is not scheduled.
- This means we need a way to issue cancel to fluxion, and for fluxion to distinguish between 404 and another error.
- [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.
- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)

Thinking:

- How do we distinguish between a cancel to fluxion (error) vs. error because it was already cancelled?
- How would that happen?
- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer
- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- What should we do if a pod is updated, and the group is removed?
- fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead.
- more efficient to retrieve podspec from kubernetes instead of putting into database?

## License

Expand All @@ -211,4 +217,4 @@ The original fluence code (for which some partial is here) is covered under [LIC

SPDX-License-Identifier: Apache-2.0

LLNL-CODE-764420
LLNL-CODE-764420
23 changes: 20 additions & 3 deletions chart/templates/rbac.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: scheduler-plugins-scheduler
name: {{ .Values.scheduler.name }}
rules:
- apiGroups: [""]
resources: ["namespaces"]
Expand All @@ -26,6 +26,9 @@ rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch", "patch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["delete", "get", "list", "watch", "update"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["delete", "get", "list", "watch", "update"]
Expand Down Expand Up @@ -80,15 +83,29 @@ rules:
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: scheduler-plugins-scheduler
name: {{ .Values.scheduler.name }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: scheduler-plugins-scheduler
name: {{ .Values.scheduler.name }}
subjects:
- kind: ServiceAccount
name: {{ .Values.scheduler.name }}
namespace: {{ .Release.Namespace }}
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Values.scheduler.name }}
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ .Values.scheduler.name }}
namespace: {{ .Release.Namespace }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: {{ .Values.scheduler.name }}
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
8 changes: 6 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ After install (see the [README](../README.md)) you can create any abstraction (p
|------|-------------|---------|
| "fluxnetes.group-name" | The name of the group | fluxnetes-group-<namespace>-<name> |
| "fluxnetes.group-size" | The size of the group | 1 |
| "fluxnetes.duration" | Duration of the job (seconds) | 3600 |

As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single
pod becomes a single member group. If you set the duration to 0, it will be unlimited. If you set a negative value, you'll get an error.
pod becomes a single member group.

### Duration

Any pod object (or PodSpec template) can accept an `activeDeadlineSeconds` and this is how you should set your job time. Note that if you don't set this value, there will
be no duration (and the pod or object can run forever), which is needed for services, etc.

## Design

Expand Down
15 changes: 15 additions & 0 deletions examples/job-deadline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job-deadline
spec:
template:
spec:
schedulerName: fluxnetes
activeDeadlineSeconds: 5
containers:
- name: job
image: busybox
command: [sleep, "10"]
restartPolicy: Never
backoffLimit: 4
1 change: 0 additions & 1 deletion examples/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ spec:
metadata:
labels:
fluxnetes.group-name: job
fluxnetes.duration: "5"
spec:
schedulerName: fluxnetes
containers:
Expand Down
13 changes: 13 additions & 0 deletions examples/pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v1
kind: Pod
metadata:
name: pod
labels:
fluxnetes.group-name: pod
spec:
activeDeadlineSeconds: 5
schedulerName: fluxnetes
containers:
- name: fruit
image: busybox
command: [sleep, "10"]
60 changes: 60 additions & 0 deletions kubernetes/pkg/fluxnetes/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package fluxnetes

import (
corev1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)

// UpdatePodEvent is called on an update, and the old and new object are presented
func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) {

pod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)

// a pod is updated, get the group. TODO: how to handle change in group name?
// groupName := groups.GetPodGroupName(oldPod)
switch pod.Status.Phase {
case corev1.PodPending:
klog.Infof("Received update event 'Pending' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodRunning:
klog.Infof("Received update event 'Running' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received update event 'Succeeded' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodFailed:
klog.Infof("Received update event 'Failed' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodUnknown:
klog.Infof("Received update event 'Unknown' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
default:
klog.Infof("Received unknown update event %s for pod %s/%s", newPod.Status.Phase, pod.Status.Phase, pod.Namespace, pod.Name)
}
}

// DeletePodEventhandles the delete event handler
// We don't need to worry about calling cancel to fluxion if the fluxid is already cleaned up
// It has a boolean that won't return an error if the job does not exist.
func (q *Queue) DeletePodEvent(podObj interface{}) {
pod := podObj.(*corev1.Pod)

switch pod.Status.Phase {
case corev1.PodPending:
klog.Infof("Received delete event 'Pending' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodRunning:
klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name)
// TODO insert submit cleanup here - need a way to get the fluxId
// Likely we can keep around the group name and flux id in a database, and get / delete from there.
// err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{})
//if err != nil {
// klog.Errorf("Issue cleaning up deleted pod", err)
// }
//}}
case corev1.PodFailed:
klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodUnknown:
klog.Infof("Received delete event 'Unknown' for pod %s/%s", pod.Namespace, pod.Name)
default:
klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name)
}

}
56 changes: 35 additions & 21 deletions kubernetes/pkg/fluxnetes/group/group.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package group

import (
"context"
"fmt"
"time"

"strconv"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels"
)

Expand All @@ -18,9 +22,7 @@ type PodGroup struct {
Name string
Size int32
Timestamp metav1.MicroTime

// Duration in seconds
Duration int32
Duration int64
}

// getPodGroupName returns the pod group name
Expand Down Expand Up @@ -61,31 +63,43 @@ func GetPodGroupSize(pod *corev1.Pod) (int32, error) {
return int32(size), nil
}

// GetPodGroupDuration gets the runtime of a job in seconds
// We default to an hour (3600 seconds)
func GetPodGroupDuration(pod *corev1.Pod) (int32, error) {
// AddDeadline adds the pod.Spec.ActiveDeadlineSeconds if it isn't set.
func AddDeadline(ctx context.Context, pod *corev1.Pod) error {

// Do we have a group size? This will be parsed as a string, likely
duration, ok := pod.Labels[labels.PodGroupDurationLabel]
if !ok {
duration = "3600"
pod.Labels[labels.PodGroupDurationLabel] = duration
// Cut out early if it is nil - will be added later
if pod.Spec.ActiveDeadlineSeconds == nil {
return nil
}

// We need the group size to be an integer now!
jobDuration, err := strconv.ParseInt(duration, 10, 32)
// Also cut out early with no error if one is set
if *pod.Spec.ActiveDeadlineSeconds > int64(0) {
return nil
}
payload := `{"spec": {"activeDeadlineSeconds": 3600}`
config, err := rest.InClusterConfig()
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return defaults.DefaultDuration, err
return err
}
_, err = clientset.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.MergePatchType, []byte(payload), metav1.PatchOptions{})
return err
}

// GetPodGroupDuration gets the runtime of a job in seconds
// We default to 0, no limit, to allow for services, etc.
func GetPodGroupDuration(pod *corev1.Pod) (int64, error) {

// The duration cannot be negative
if jobDuration < 0 {
return 0, fmt.Errorf("%s label must be >= 0", labels.PodGroupDurationLabel)
// It is already set
if pod.Spec.ActiveDeadlineSeconds != nil && *pod.Spec.ActiveDeadlineSeconds > int64(0) {
return *pod.Spec.ActiveDeadlineSeconds, nil
}
return int32(jobDuration), nil
// We can't enforce everything have a duration, lots of services should not.
return 0, nil
}

// GetPodCreationTimestamp
// GetPodCreationTimestamp returns the creation timestamp as a MicroTime
func GetPodCreationTimestamp(pod *corev1.Pod) metav1.MicroTime {

// This is the first member of the group - use its CreationTimestamp
Expand Down
3 changes: 0 additions & 3 deletions kubernetes/pkg/fluxnetes/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ const (
// We use the same label to be consistent
PodGroupLabel = "fluxnetes.group-name"
PodGroupSizeLabel = "fluxnetes.group-size"

// How long should the group run, in seconds (before cancel)
PodGroupDurationLabel = "fluxnetes.duration"
)

// GetPodGroupLabel get pod group name from pod labels
Expand Down
29 changes: 24 additions & 5 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/util/slogutil"
"k8s.io/client-go/tools/cache"

"k8s.io/kubernetes/pkg/scheduler/framework"
groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries"
strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy"
Expand All @@ -33,6 +34,7 @@ type Queue struct {
riverClient *river.Client[pgx.Tx]
EventChannel *QueueEvent
Strategy strategies.QueueStrategy
Handle framework.Handle

// IMPORTANT: subscriptions need to use same context
// that client submit them uses
Expand All @@ -55,7 +57,7 @@ type QueueEvent struct {
}

// NewQueue starts a new queue with a river client
func NewQueue(ctx context.Context) (*Queue, error) {
func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) {
dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
return nil, err
Expand Down Expand Up @@ -105,6 +107,7 @@ func NewQueue(ctx context.Context) (*Queue, error) {
Strategy: strategy,
Context: ctx,
ReservationDepth: depth,
Handle: handle,
}
queue.setupEvents()
return &queue, nil
Expand Down Expand Up @@ -139,6 +142,21 @@ func (q *Queue) setupEvents() {
q.EventChannel = &QueueEvent{Function: trigger, Channel: c}
}

// GetInformer returns the pod informer to run as a go routine
func (q *Queue) GetInformer() cache.SharedIndexInformer {

// Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning.
podsInformer := q.Handle.SharedInformerFactory().Core().V1().Pods().Informer()
podsInformer.AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

// Event handlers to call on update/delete for cleanup
podsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: q.UpdatePodEvent,
DeleteFunc: q.DeletePodEvent,
})
return podsInformer
}

// Enqueue a new job to the provisional queue
// 1. Assemble (discover or define) the group
// 2. Add to provisional table
Expand All @@ -150,13 +168,14 @@ func (q *Queue) Enqueue(pod *corev1.Pod) error {
if err != nil {
return err
}
duration, err := groups.GetPodGroupDuration(pod)

// Get the creation timestamp for the group
ts, err := q.GetCreationTimestamp(pod, groupName)
if err != nil {
return err
}

// Get the creation timestamp for the group
ts, err := q.GetCreationTimestamp(pod, groupName)
duration, err := groups.GetPodGroupDuration(pod)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 606e18b

Please sign in to comment.