Skip to content

Commit

Permalink
fluxion-grpc: add support for boolean to not error on no exist
Browse files Browse the repository at this point in the history
Currently, this variable is not exposed via the grpc. We want to
expose it to customize it, but also be pedantic to say that we do
not want fluxion to return an error if the jobid does not exist.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Aug 4, 2024
1 parent fa3d2e3 commit 2c098c7
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 51 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,15 @@ SELECT group_name, group_size from pods_provisional;
- fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol.
- [ ] create state diagram that shows how stuff works
- [ ] Decide what to do on events - currently we delete / cleanup when there is a decided timeout for pod/job
- Arguably, we need to respond to these events for services, etc., where a cleanup job is not scheduled.
- This means we need a way to issue cancel to fluxion, and for fluxion to distinguish between 404 and another error.
- [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status.
- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.)

Thinking:

- How do we distinguish between a cancel to fluxion (error) vs. error because it was already cancelled?
- How would that happen?
- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer
- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- What should we do if a pod is updated, and the group is removed?
Expand All @@ -213,4 +217,4 @@ The original fluence code (for which some partial is here) is covered under [LIC

SPDX-License-Identifier: Apache-2.0

LLNL-CODE-764420
LLNL-CODE-764420
1 change: 0 additions & 1 deletion examples/pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
name: pod
labels:
fluxnetes.group-name: pod
# fluxnetes.duration: "5"
spec:
activeDeadlineSeconds: 5
schedulerName: fluxnetes
Expand Down
13 changes: 11 additions & 2 deletions kubernetes/pkg/fluxnetes/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
klog "k8s.io/klog/v2"
)

// UpdatePod is called on an update, and the old and new object are presented
// UpdatePodEvent is called on an update, and the old and new object are presented
func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) {

pod := oldObj.(*corev1.Pod)
Expand All @@ -29,7 +29,9 @@ func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) {
}
}

// DeletePod handles the delete event handler
// DeletePodEventhandles the delete event handler
// We don't need to worry about calling cancel to fluxion if the fluxid is already cleaned up
// It has a boolean that won't return an error if the job does not exist.
func (q *Queue) DeletePodEvent(podObj interface{}) {
pod := podObj.(*corev1.Pod)

Expand All @@ -40,6 +42,13 @@ func (q *Queue) DeletePodEvent(podObj interface{}) {
klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name)
// TODO insert submit cleanup here - need a way to get the fluxId
// Likely we can keep around the group name and flux id in a database, and get / delete from there.
// err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{})
//if err != nil {
// klog.Errorf("Issue cleaning up deleted pod", err)
// }
//}}
case corev1.PodFailed:
klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodUnknown:
Expand Down
6 changes: 3 additions & 3 deletions kubernetes/pkg/fluxnetes/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func GetPodGroupSize(pod *corev1.Pod) (int32, error) {
return int32(size), nil
}

// AddDuration adds the pod.Spec.ActiveDeadlineSeconds if it isn't set.
// AddDeadline adds the pod.Spec.ActiveDeadlineSeconds if it isn't set.
func AddDeadline(ctx context.Context, pod *corev1.Pod) error {

// Cut out early if it is nil - will be added later
Expand All @@ -88,7 +88,7 @@ func AddDeadline(ctx context.Context, pod *corev1.Pod) error {
}

// GetPodGroupDuration gets the runtime of a job in seconds
// We default to an hour (3600 seconds)
// We default to 0, no limit, to allow for services, etc.
func GetPodGroupDuration(pod *corev1.Pod) (int64, error) {

// It is already set
Expand All @@ -99,7 +99,7 @@ func GetPodGroupDuration(pod *corev1.Pod) (int64, error) {
return 0, nil
}

// GetPodCreationTimestamp
// GetPodCreationTimestamp returns the creation timestamp as a MicroTime
func GetPodCreationTimestamp(pod *corev1.Pod) metav1.MicroTime {

// This is the first member of the group - use its CreationTimestamp
Expand Down
13 changes: 9 additions & 4 deletions kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,9 @@ func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) er
// First attempt cleanup in the cluster, only if in Kubernetes
if job.Args.Kubernetes {
err := deleteObjects(ctx, job)
if errors.IsNotFound(err) {
return nil
}
if err != nil {

// The job might have been deleted another way
if err != nil && !errors.IsNotFound(err) {
return err
}
}
Expand All @@ -171,11 +170,17 @@ func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) er
defer cancel()

// Prepare the request to cancel
// https://github.com/flux-framework/flux-sched/blob/master/resource/reapi/bindings/go/src/fluxcli/reapi_cli.go#L226
request := &pb.CancelRequest{
FluxID: uint64(job.Args.FluxID),

// Don't return an error if the job id does not exist. See:
NoExistOK: true,
}

// Assume if there is an error we should try again
// TOOD:(vsoch) How to distinguish between cancel error
// and possible already cancelled?
response, err := fluxion.Cancel(fluxionCtx, request)
if err != nil {
klog.Errorf("[Fluxnetes] Issue with cancel %s %s", response.Error, err)
Expand Down
87 changes: 49 additions & 38 deletions src/fluxnetes/pkg/fluxion-grpc/fluxion.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/fluxnetes/pkg/fluxion-grpc/fluxion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ message MatchResponse {

message CancelRequest {
uint64 fluxID = 1;
// It's ok if it doesn't exist (don't issue an error)
bool NoExistOK = 2;
}

// The Match response message
Expand Down
7 changes: 5 additions & 2 deletions src/fluxnetes/pkg/fluxion/fluxion.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ func (fluxion *Fluxion) InitFluxion(policy, label string) {
}

// Cancel wraps the Cancel function of the fluxion go bindings
func (fluxion *Fluxion) Cancel(ctx context.Context, in *pb.CancelRequest) (*pb.CancelResponse, error) {
func (fluxion *Fluxion) Cancel(
ctx context.Context,
in *pb.CancelRequest,
) (*pb.CancelResponse, error) {

klog.Infof("[Fluxnetes] received cancel request %v\n", in)
err := fluxion.cli.Cancel(int64(in.FluxID), true)
err := fluxion.cli.Cancel(int64(in.FluxID), in.NoExistOK)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 2c098c7

Please sign in to comment.