Skip to content

Commit

Permalink
add owner cleanup
Browse files Browse the repository at this point in the history
In the case that a duration is set (the pod or pod
template activeDeadlineSeconds) kick off a cleanup
job to remove it from Kubernetes and the scheduler.
Other deletions will need to be handled with events,
which I will do next! I also changed my mind about
the default deletion of 3600 - I think that can be
dangerous for services or other stuffs that should
not be expected to just end. We need to trust the
users creating the abstractions. Also, pods that
are associated with jobs clean themselves up, so
we do not want to double mess with that.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Aug 4, 2024
1 parent 24ed8d2 commit 915ed2d
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 41 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,13 @@ SELECT group_name, group_size from pods_provisional;
- [x] cleanup job is triggered after duration
- [ ] issue cancel to fluxion and delete pods up to parent (how to do this)?
- [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.
- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)

Thinking:

- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead.
- more efficient to retrieve podspec from kubernetes instead of putting into database?

## License

Expand Down
8 changes: 6 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ After install (see the [README](../README.md)) you can create any abstraction (p
|------|-------------|---------|
| "fluxnetes.group-name" | The name of the group | fluxnetes-group-<namespace>-<name> |
| "fluxnetes.group-size" | The size of the group | 1 |
| "fluxnetes.duration" | Duration of the job (seconds) | 3600 |

As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single
pod becomes a single member group. If you set the duration to 0, it will be unlimited. If you set a negative value, you'll get an error.
pod becomes a single member group.

### Duration

Any pod object (or PodSpec template) can accept an `activeDeadlineSeconds` and this is how you should set your job time. Note that if you don't set this value, there will
be no duration (and the pod or object can run forever), which is needed for services, etc.

## Design

Expand Down
1 change: 0 additions & 1 deletion examples/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ spec:
metadata:
labels:
fluxnetes.group-name: job
fluxnetes.duration: "5"
spec:
schedulerName: fluxnetes
containers:
Expand Down
14 changes: 14 additions & 0 deletions examples/pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1
kind: Pod
metadata:
name: pod
labels:
fluxnetes.group-name: pod
# fluxnetes.duration: "5"
spec:
activeDeadlineSeconds: 5
schedulerName: fluxnetes
containers:
- name: fruit
image: busybox
command: [sleep, "10"]
54 changes: 34 additions & 20 deletions kubernetes/pkg/fluxnetes/group/group.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package group

import (
"context"
"fmt"
"time"

"strconv"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels"
)

Expand All @@ -18,9 +22,7 @@ type PodGroup struct {
Name string
Size int32
Timestamp metav1.MicroTime

// Duration in seconds
Duration int32
Duration int64
}

// getPodGroupName returns the pod group name
Expand Down Expand Up @@ -61,28 +63,40 @@ func GetPodGroupSize(pod *corev1.Pod) (int32, error) {
return int32(size), nil
}

// GetPodGroupDuration gets the runtime of a job in seconds
// We default to an hour (3600 seconds)
func GetPodGroupDuration(pod *corev1.Pod) (int32, error) {
// AddDuration adds the pod.Spec.ActiveDeadlineSeconds if it isn't set.
func AddDeadline(ctx context.Context, pod *corev1.Pod) error {

// Do we have a group size? This will be parsed as a string, likely
duration, ok := pod.Labels[labels.PodGroupDurationLabel]
if !ok {
duration = "3600"
pod.Labels[labels.PodGroupDurationLabel] = duration
// Cut out early if it is nil - will be added later
if pod.Spec.ActiveDeadlineSeconds == nil {
return nil
}

// We need the group size to be an integer now!
jobDuration, err := strconv.ParseInt(duration, 10, 32)
// Also cut out early with no error if one is set
if *pod.Spec.ActiveDeadlineSeconds > int64(0) {
return nil
}
payload := `{"spec": {"activeDeadlineSeconds": 3600}`
config, err := rest.InClusterConfig()
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return defaults.DefaultDuration, err
return err
}
_, err = clientset.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.MergePatchType, []byte(payload), metav1.PatchOptions{})
return err
}

// GetPodGroupDuration gets the runtime of a job in seconds
// We default to an hour (3600 seconds)
func GetPodGroupDuration(pod *corev1.Pod) (int64, error) {

// The duration cannot be negative
if jobDuration < 0 {
return 0, fmt.Errorf("%s label must be >= 0", labels.PodGroupDurationLabel)
// It is already set
if pod.Spec.ActiveDeadlineSeconds != nil && *pod.Spec.ActiveDeadlineSeconds > int64(0) {
return *pod.Spec.ActiveDeadlineSeconds, nil
}
return int32(jobDuration), nil
// We can't enforce everything have a duration, lots of services should not.
return 0, nil
}

// GetPodCreationTimestamp
Expand Down
3 changes: 0 additions & 3 deletions kubernetes/pkg/fluxnetes/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ const (
// We use the same label to be consistent
PodGroupLabel = "fluxnetes.group-name"
PodGroupSizeLabel = "fluxnetes.group-size"

// How long should the group run, in seconds (before cancel)
PodGroupDurationLabel = "fluxnetes.duration"
)

// GetPodGroupLabel get pod group name from pod labels
Expand Down
7 changes: 4 additions & 3 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,14 @@ func (q *Queue) Enqueue(pod *corev1.Pod) error {
if err != nil {
return err
}
duration, err := groups.GetPodGroupDuration(pod)

// Get the creation timestamp for the group
ts, err := q.GetCreationTimestamp(pod, groupName)
if err != nil {
return err
}

// Get the creation timestamp for the group
ts, err := q.GetCreationTimestamp(pod, groupName)
duration, err := groups.GetPodGroupDuration(pod)
if err != nil {
return err
}
Expand Down
84 changes: 76 additions & 8 deletions kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workers

import (
"context"
"encoding/json"
"fmt"
"time"

Expand All @@ -10,6 +11,11 @@ import (

"google.golang.org/grpc"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

klog "k8s.io/klog/v2"

"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults"
Expand All @@ -22,6 +28,7 @@ type CleanupArgs struct {
// We don't need to know this, but it's nice for the user to see
GroupName string `json:"groupName"`
FluxID int64 `json:"fluxid"`
Podspec string `json:"podspec"`

// Do we need to cleanup Kubernetes too?
Kubernetes bool `json:"kubernetes"`
Expand All @@ -38,7 +45,8 @@ type CleanupWorker struct {
func SubmitCleanup(
ctx context.Context,
pool *pgxpool.Pool,
seconds int32,
seconds *int64,
podspec string,
fluxID int64,
inKubernetes bool,
tags []string,
Expand All @@ -58,15 +66,15 @@ func SubmitCleanup(

// Create scheduledAt time - N seconds from now
now := time.Now()
scheduledAt := now.Add(time.Second * time.Duration(seconds))
scheduledAt := now.Add(time.Second * time.Duration(*seconds))

insertOpts := river.InsertOpts{
MaxAttempts: defaults.MaxAttempts,
Tags: tags,
Queue: "cancel_queue",
ScheduledAt: scheduledAt,
}
_, err = client.InsertTx(ctx, tx, CleanupArgs{FluxID: fluxID, Kubernetes: inKubernetes}, &insertOpts)
_, err = client.InsertTx(ctx, tx, CleanupArgs{FluxID: fluxID, Kubernetes: inKubernetes, Podspec: podspec}, &insertOpts)
if err != nil {
return err
}
Expand All @@ -77,10 +85,74 @@ func SubmitCleanup(
return nil
}

// Work performs the Cancel action
// deleteObjects cleans up (deletes) Kubernetes objects
// We do this before the call to fluxion so we can be sure the
// cluster object resources are freed first
func deleteObjects(ctx context.Context, job *river.Job[CleanupArgs]) error {
config, err := rest.InClusterConfig()
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}

// Serialize the podspec back to a pod
var pod corev1.Pod
err = json.Unmarshal([]byte(job.Args.Podspec), &pod)
if err != nil {
return err
}

// If we only have the pod (no owner references) we can just delete it.
if len(pod.ObjectMeta.OwnerReferences) == 0 {
klog.Infof("Single pod cleanup for %s/%s", pod.Namespace, pod.Name)
deletePolicy := metav1.DeletePropagationForeground
opts := metav1.DeleteOptions{PropagationPolicy: &deletePolicy}
return clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, opts)
}

// If we get here, we are deleting an owner. It can (for now) be: job
// We can add other types as they come in!
for _, owner := range pod.ObjectMeta.OwnerReferences {
klog.Infof("Pod %s/%s has owner %s with UID %s", pod.Namespace, pod.Name, owner.Kind, owner.UID)
if owner.Kind == "Job" {
return deleteJob(ctx, pod.Namespace, clientset, owner)
}
// Important: need to figure out what to do with BlockOwnerDeletion
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319
}
return nil
}

// deleteJob handles deletion of a Job
func deleteJob(ctx context.Context, namespace string, client kubernetes.Interface, owner metav1.OwnerReference) error {
job, err := client.BatchV1().Jobs(namespace).Get(ctx, owner.Name, metav1.GetOptions{})
if err != nil {
return err
}
klog.Infof("Found job %s/%s", job.Namespace, job.Name)

// This needs to be background for pods
deletePolicy := metav1.DeletePropagationBackground
opts := metav1.DeleteOptions{PropagationPolicy: &deletePolicy}
return client.BatchV1().Jobs(namespace).Delete(ctx, job.Name, opts)
}

// Work performs the Cancel action, first cancelling in Kubernetes (if needed)
// and then cancelling in fluxion.
func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) error {
klog.Infof("[CLEANUP-WORKER-START] Cleanup (cancel) running for jobid %s", job.Args.FluxID)

// First attempt cleanup in the cluster, only if in Kubernetes
if job.Args.Kubernetes {
err := deleteObjects(ctx, job)
if err != nil {
return err
}
}

// Connect to the Fluxion service. Returning an error means we retry
// see: https://riverqueue.com/docs/job-retries
conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure())
Expand All @@ -106,10 +178,6 @@ func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) er
klog.Errorf("[Fluxnetes] Issue with cancel %s %s", response.Error, err)
return err
}

// Collect rows into single result
// pgx.CollectRows(rows, pgx.RowTo[string])
// klog.Infof("Values: %s", values)
klog.Infof("[CLEANUP-WORKER-COMPLETE] for group %s (flux job id %d)",
job.Args.GroupName, job.Args.FluxID)
return nil
Expand Down
11 changes: 7 additions & 4 deletions kubernetes/pkg/fluxnetes/strategy/workers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,13 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
}
defer rows.Close()

// Kick off a cleaning job for when everyting should be cancelled
err = SubmitCleanup(ctx, pool, job.Args.Duration, int64(fluxID), true, []string{})
if err != nil {
return err
// Kick off a cleaning job for when everyting should be cancelled, but only if
// there is a deadline set. We can't set a deadline for services, etc.
if job.Args.Duration > 0 {
err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{})
if err != nil {
return err
}
}

// Collect rows into single result
Expand Down

0 comments on commit 915ed2d

Please sign in to comment.