diff --git a/README.md b/README.md index f650fe9..907c87b 100644 --- a/README.md +++ b/README.md @@ -182,15 +182,21 @@ SELECT group_name, group_size from pods_provisional; - [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth) - fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol. - [ ] create state diagram that shows how stuff works -- [ ] When a job is allocated, we likely need to submit a cancel job that will ensure it can be cancelled when the time runs out - - [x] add the label for the job timeout, default to one hour - - [x] cleanup job is triggered after duration - - [ ] issue cancel to fluxion and delete pods up to parent (how to do this)? +- [ ] Decide what to do on events - currently we delete / cleanup when there is a decided timeout for pod/job + - Arguably, we need to respond to these events for services, etc., where a cleanup job is not scheduled. + - This means we need a way to issue cancel to fluxion, and for fluxion to distinguish between 404 and another error. - [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status. +- [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.) Thinking: +- How do we distinguish between a cancel to fluxion (error) vs. error because it was already cancelled? + - How would that happen? +- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer - We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this) +- What should we do if a pod is updated, and the group is removed? +- fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead. +- more efficient to retrieve podspec from kubernetes instead of putting into database? ## License @@ -211,4 +217,4 @@ The original fluence code (for which some partial is here) is covered under [LIC SPDX-License-Identifier: Apache-2.0 -LLNL-CODE-764420 \ No newline at end of file +LLNL-CODE-764420 diff --git a/chart/templates/rbac.yaml b/chart/templates/rbac.yaml index 0adaf84..a93b090 100644 --- a/chart/templates/rbac.yaml +++ b/chart/templates/rbac.yaml @@ -1,7 +1,7 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: scheduler-plugins-scheduler + name: {{ .Values.scheduler.name }} rules: - apiGroups: [""] resources: ["namespaces"] @@ -26,6 +26,9 @@ rules: - apiGroups: [""] resources: ["nodes"] verbs: ["get", "list", "watch", "patch"] +- apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["delete", "get", "list", "watch", "update"] - apiGroups: [""] resources: ["pods"] verbs: ["delete", "get", "list", "watch", "update"] @@ -80,15 +83,29 @@ rules: kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: - name: scheduler-plugins-scheduler + name: {{ .Values.scheduler.name }} roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: scheduler-plugins-scheduler + name: {{ .Values.scheduler.name }} +subjects: +- kind: ServiceAccount + name: {{ .Values.scheduler.name }} + namespace: {{ .Release.Namespace }} +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{ .Values.scheduler.name }} + namespace: {{ .Release.Namespace }} subjects: - kind: ServiceAccount name: {{ .Values.scheduler.name }} namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ .Values.scheduler.name }} --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 diff --git a/docs/README.md b/docs/README.md index 7fe9c1e..96e99dc 100644 --- a/docs/README.md +++ b/docs/README.md @@ -8,10 +8,14 @@ After install (see the [README](../README.md)) you can create any abstraction (p |------|-------------|---------| | "fluxnetes.group-name" | The name of the group | fluxnetes-group-- | | "fluxnetes.group-size" | The size of the group | 1 | -| "fluxnetes.duration" | Duration of the job (seconds) | 3600 | As you might guess, if you specify `fluxnetes` as the scheduler but don't provide any of the above, the defaults are used. This means a single -pod becomes a single member group. If you set the duration to 0, it will be unlimited. If you set a negative value, you'll get an error. +pod becomes a single member group. + +### Duration + +Any pod object (or PodSpec template) can accept an `activeDeadlineSeconds` and this is how you should set your job time. Note that if you don't set this value, there will +be no duration (and the pod or object can run forever), which is needed for services, etc. ## Design diff --git a/examples/job-deadline.yaml b/examples/job-deadline.yaml new file mode 100644 index 0000000..29401cd --- /dev/null +++ b/examples/job-deadline.yaml @@ -0,0 +1,15 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: job-deadline +spec: + template: + spec: + schedulerName: fluxnetes + activeDeadlineSeconds: 5 + containers: + - name: job + image: busybox + command: [sleep, "10"] + restartPolicy: Never + backoffLimit: 4 diff --git a/examples/job.yaml b/examples/job.yaml index 71ccc65..43f2d0e 100644 --- a/examples/job.yaml +++ b/examples/job.yaml @@ -7,7 +7,6 @@ spec: metadata: labels: fluxnetes.group-name: job - fluxnetes.duration: "5" spec: schedulerName: fluxnetes containers: diff --git a/examples/pod.yaml b/examples/pod.yaml new file mode 100644 index 0000000..5e5a3a6 --- /dev/null +++ b/examples/pod.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Pod +metadata: + name: pod + labels: + fluxnetes.group-name: pod +spec: + activeDeadlineSeconds: 5 + schedulerName: fluxnetes + containers: + - name: fruit + image: busybox + command: [sleep, "10"] diff --git a/kubernetes/pkg/fluxnetes/events.go b/kubernetes/pkg/fluxnetes/events.go new file mode 100644 index 0000000..d3a909e --- /dev/null +++ b/kubernetes/pkg/fluxnetes/events.go @@ -0,0 +1,60 @@ +package fluxnetes + +import ( + corev1 "k8s.io/api/core/v1" + klog "k8s.io/klog/v2" +) + +// UpdatePodEvent is called on an update, and the old and new object are presented +func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) { + + pod := oldObj.(*corev1.Pod) + newPod := newObj.(*corev1.Pod) + + // a pod is updated, get the group. TODO: how to handle change in group name? + // groupName := groups.GetPodGroupName(oldPod) + switch pod.Status.Phase { + case corev1.PodPending: + klog.Infof("Received update event 'Pending' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + case corev1.PodRunning: + klog.Infof("Received update event 'Running' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + case corev1.PodSucceeded: + klog.Infof("Received update event 'Succeeded' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + case corev1.PodFailed: + klog.Infof("Received update event 'Failed' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + case corev1.PodUnknown: + klog.Infof("Received update event 'Unknown' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + default: + klog.Infof("Received unknown update event %s for pod %s/%s", newPod.Status.Phase, pod.Status.Phase, pod.Namespace, pod.Name) + } +} + +// DeletePodEventhandles the delete event handler +// We don't need to worry about calling cancel to fluxion if the fluxid is already cleaned up +// It has a boolean that won't return an error if the job does not exist. +func (q *Queue) DeletePodEvent(podObj interface{}) { + pod := podObj.(*corev1.Pod) + + switch pod.Status.Phase { + case corev1.PodPending: + klog.Infof("Received delete event 'Pending' for pod %s/%s", pod.Namespace, pod.Name) + case corev1.PodRunning: + klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name) + case corev1.PodSucceeded: + klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name) + // TODO insert submit cleanup here - need a way to get the fluxId + // Likely we can keep around the group name and flux id in a database, and get / delete from there. + // err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{}) + //if err != nil { + // klog.Errorf("Issue cleaning up deleted pod", err) + // } + //}} + case corev1.PodFailed: + klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name) + case corev1.PodUnknown: + klog.Infof("Received delete event 'Unknown' for pod %s/%s", pod.Namespace, pod.Name) + default: + klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name) + } + +} diff --git a/kubernetes/pkg/fluxnetes/group/group.go b/kubernetes/pkg/fluxnetes/group/group.go index c95453b..4182c8d 100644 --- a/kubernetes/pkg/fluxnetes/group/group.go +++ b/kubernetes/pkg/fluxnetes/group/group.go @@ -1,14 +1,18 @@ package group import ( + "context" "fmt" "time" "strconv" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults" + "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels" ) @@ -18,9 +22,7 @@ type PodGroup struct { Name string Size int32 Timestamp metav1.MicroTime - - // Duration in seconds - Duration int32 + Duration int64 } // getPodGroupName returns the pod group name @@ -61,31 +63,43 @@ func GetPodGroupSize(pod *corev1.Pod) (int32, error) { return int32(size), nil } -// GetPodGroupDuration gets the runtime of a job in seconds -// We default to an hour (3600 seconds) -func GetPodGroupDuration(pod *corev1.Pod) (int32, error) { +// AddDeadline adds the pod.Spec.ActiveDeadlineSeconds if it isn't set. +func AddDeadline(ctx context.Context, pod *corev1.Pod) error { - // Do we have a group size? This will be parsed as a string, likely - duration, ok := pod.Labels[labels.PodGroupDurationLabel] - if !ok { - duration = "3600" - pod.Labels[labels.PodGroupDurationLabel] = duration + // Cut out early if it is nil - will be added later + if pod.Spec.ActiveDeadlineSeconds == nil { + return nil } - - // We need the group size to be an integer now! - jobDuration, err := strconv.ParseInt(duration, 10, 32) + // Also cut out early with no error if one is set + if *pod.Spec.ActiveDeadlineSeconds > int64(0) { + return nil + } + payload := `{"spec": {"activeDeadlineSeconds": 3600}` + config, err := rest.InClusterConfig() + if err != nil { + return err + } + clientset, err := kubernetes.NewForConfig(config) if err != nil { - return defaults.DefaultDuration, err + return err } + _, err = clientset.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.MergePatchType, []byte(payload), metav1.PatchOptions{}) + return err +} + +// GetPodGroupDuration gets the runtime of a job in seconds +// We default to 0, no limit, to allow for services, etc. +func GetPodGroupDuration(pod *corev1.Pod) (int64, error) { - // The duration cannot be negative - if jobDuration < 0 { - return 0, fmt.Errorf("%s label must be >= 0", labels.PodGroupDurationLabel) + // It is already set + if pod.Spec.ActiveDeadlineSeconds != nil && *pod.Spec.ActiveDeadlineSeconds > int64(0) { + return *pod.Spec.ActiveDeadlineSeconds, nil } - return int32(jobDuration), nil + // We can't enforce everything have a duration, lots of services should not. + return 0, nil } -// GetPodCreationTimestamp +// GetPodCreationTimestamp returns the creation timestamp as a MicroTime func GetPodCreationTimestamp(pod *corev1.Pod) metav1.MicroTime { // This is the first member of the group - use its CreationTimestamp diff --git a/kubernetes/pkg/fluxnetes/labels/labels.go b/kubernetes/pkg/fluxnetes/labels/labels.go index 76a510b..b436cba 100644 --- a/kubernetes/pkg/fluxnetes/labels/labels.go +++ b/kubernetes/pkg/fluxnetes/labels/labels.go @@ -10,9 +10,6 @@ const ( // We use the same label to be consistent PodGroupLabel = "fluxnetes.group-name" PodGroupSizeLabel = "fluxnetes.group-size" - - // How long should the group run, in seconds (before cancel) - PodGroupDurationLabel = "fluxnetes.duration" ) // GetPodGroupLabel get pod group name from pod labels diff --git a/kubernetes/pkg/fluxnetes/queue.go b/kubernetes/pkg/fluxnetes/queue.go index 4b49882..affa822 100644 --- a/kubernetes/pkg/fluxnetes/queue.go +++ b/kubernetes/pkg/fluxnetes/queue.go @@ -12,11 +12,12 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/util/slogutil" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/scheduler/framework" groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy" @@ -33,6 +34,7 @@ type Queue struct { riverClient *river.Client[pgx.Tx] EventChannel *QueueEvent Strategy strategies.QueueStrategy + Handle framework.Handle // IMPORTANT: subscriptions need to use same context // that client submit them uses @@ -55,7 +57,7 @@ type QueueEvent struct { } // NewQueue starts a new queue with a river client -func NewQueue(ctx context.Context) (*Queue, error) { +func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) { dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) if err != nil { return nil, err @@ -105,6 +107,7 @@ func NewQueue(ctx context.Context) (*Queue, error) { Strategy: strategy, Context: ctx, ReservationDepth: depth, + Handle: handle, } queue.setupEvents() return &queue, nil @@ -139,6 +142,21 @@ func (q *Queue) setupEvents() { q.EventChannel = &QueueEvent{Function: trigger, Channel: c} } +// GetInformer returns the pod informer to run as a go routine +func (q *Queue) GetInformer() cache.SharedIndexInformer { + + // Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning. + podsInformer := q.Handle.SharedInformerFactory().Core().V1().Pods().Informer() + podsInformer.AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + // Event handlers to call on update/delete for cleanup + podsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: q.UpdatePodEvent, + DeleteFunc: q.DeletePodEvent, + }) + return podsInformer +} + // Enqueue a new job to the provisional queue // 1. Assemble (discover or define) the group // 2. Add to provisional table @@ -150,13 +168,14 @@ func (q *Queue) Enqueue(pod *corev1.Pod) error { if err != nil { return err } - duration, err := groups.GetPodGroupDuration(pod) + + // Get the creation timestamp for the group + ts, err := q.GetCreationTimestamp(pod, groupName) if err != nil { return err } - // Get the creation timestamp for the group - ts, err := q.GetCreationTimestamp(pod, groupName) + duration, err := groups.GetPodGroupDuration(pod) if err != nil { return err } diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go index 8a39c2b..c9c829b 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go @@ -2,13 +2,18 @@ package workers import ( "context" + "encoding/json" "fmt" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "google.golang.org/grpc" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" klog "k8s.io/klog/v2" @@ -22,6 +27,7 @@ type CleanupArgs struct { // We don't need to know this, but it's nice for the user to see GroupName string `json:"groupName"` FluxID int64 `json:"fluxid"` + Podspec string `json:"podspec"` // Do we need to cleanup Kubernetes too? Kubernetes bool `json:"kubernetes"` @@ -38,7 +44,8 @@ type CleanupWorker struct { func SubmitCleanup( ctx context.Context, pool *pgxpool.Pool, - seconds int32, + seconds *int64, + podspec string, fluxID int64, inKubernetes bool, tags []string, @@ -58,7 +65,7 @@ func SubmitCleanup( // Create scheduledAt time - N seconds from now now := time.Now() - scheduledAt := now.Add(time.Second * time.Duration(seconds)) + scheduledAt := now.Add(time.Second * time.Duration(*seconds)) insertOpts := river.InsertOpts{ MaxAttempts: defaults.MaxAttempts, @@ -66,7 +73,7 @@ func SubmitCleanup( Queue: "cancel_queue", ScheduledAt: scheduledAt, } - _, err = client.InsertTx(ctx, tx, CleanupArgs{FluxID: fluxID, Kubernetes: inKubernetes}, &insertOpts) + _, err = client.InsertTx(ctx, tx, CleanupArgs{FluxID: fluxID, Kubernetes: inKubernetes, Podspec: podspec}, &insertOpts) if err != nil { return err } @@ -77,10 +84,77 @@ func SubmitCleanup( return nil } -// Work performs the Cancel action +// deleteObjects cleans up (deletes) Kubernetes objects +// We do this before the call to fluxion so we can be sure the +// cluster object resources are freed first +func deleteObjects(ctx context.Context, job *river.Job[CleanupArgs]) error { + config, err := rest.InClusterConfig() + if err != nil { + return err + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return err + } + + // Serialize the podspec back to a pod + var pod corev1.Pod + err = json.Unmarshal([]byte(job.Args.Podspec), &pod) + if err != nil { + return err + } + + // If we only have the pod (no owner references) we can just delete it. + if len(pod.ObjectMeta.OwnerReferences) == 0 { + klog.Infof("Single pod cleanup for %s/%s", pod.Namespace, pod.Name) + deletePolicy := metav1.DeletePropagationForeground + opts := metav1.DeleteOptions{PropagationPolicy: &deletePolicy} + return clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, opts) + } + + // If we get here, we are deleting an owner. It can (for now) be: job + // We can add other types as they come in! + for _, owner := range pod.ObjectMeta.OwnerReferences { + klog.Infof("Pod %s/%s has owner %s with UID %s", pod.Namespace, pod.Name, owner.Kind, owner.UID) + if owner.Kind == "Job" { + return deleteJob(ctx, pod.Namespace, clientset, owner) + } + // Important: need to figure out what to do with BlockOwnerDeletion + // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319 + } + return nil +} + +// deleteJob handles deletion of a Job +func deleteJob(ctx context.Context, namespace string, client kubernetes.Interface, owner metav1.OwnerReference) error { + job, err := client.BatchV1().Jobs(namespace).Get(ctx, owner.Name, metav1.GetOptions{}) + if err != nil { + klog.Infof("Error deleting job: %s", err) + return err + } + klog.Infof("Found job %s/%s", job.Namespace, job.Name) + + // This needs to be background for pods + deletePolicy := metav1.DeletePropagationBackground + opts := metav1.DeleteOptions{PropagationPolicy: &deletePolicy} + return client.BatchV1().Jobs(namespace).Delete(ctx, job.Name, opts) +} + +// Work performs the Cancel action, first cancelling in Kubernetes (if needed) +// and then cancelling in fluxion. func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) error { klog.Infof("[CLEANUP-WORKER-START] Cleanup (cancel) running for jobid %s", job.Args.FluxID) + // First attempt cleanup in the cluster, only if in Kubernetes + if job.Args.Kubernetes { + err := deleteObjects(ctx, job) + + // The job might have been deleted another way + if err != nil && !errors.IsNotFound(err) { + return err + } + } + // Connect to the Fluxion service. Returning an error means we retry // see: https://riverqueue.com/docs/job-retries conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) @@ -96,20 +170,22 @@ func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) er defer cancel() // Prepare the request to cancel + // https://github.com/flux-framework/flux-sched/blob/master/resource/reapi/bindings/go/src/fluxcli/reapi_cli.go#L226 request := &pb.CancelRequest{ FluxID: uint64(job.Args.FluxID), + + // Don't return an error if the job id does not exist. See: + NoExistOK: true, } // Assume if there is an error we should try again + // TOOD:(vsoch) How to distinguish between cancel error + // and possible already cancelled? response, err := fluxion.Cancel(fluxionCtx, request) if err != nil { klog.Errorf("[Fluxnetes] Issue with cancel %s %s", response.Error, err) return err } - - // Collect rows into single result - // pgx.CollectRows(rows, pgx.RowTo[string]) - // klog.Infof("Values: %s", values) klog.Infof("[CLEANUP-WORKER-COMPLETE] for group %s (flux job id %d)", job.Args.GroupName, job.Args.FluxID) return nil diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/job.go b/kubernetes/pkg/fluxnetes/strategy/workers/job.go index 96928f9..48267e1 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/job.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/job.go @@ -153,15 +153,14 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } defer rows.Close() - // Kick off a cleaning job for when everyting should be cancelled - err = SubmitCleanup(ctx, pool, job.Args.Duration, int64(fluxID), true, []string{}) - if err != nil { - return err + // Kick off a cleaning job for when everyting should be cancelled, but only if + // there is a deadline set. We can't set a deadline for services, etc. + if job.Args.Duration > 0 { + err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{}) + if err != nil { + return err + } } - - // Collect rows into single result - // pgx.CollectRows(rows, pgx.RowTo[string]) - // klog.Infof("Values: %s", values) klog.Infof("[JOB-WORKER-COMPLETE] nodes allocated %s for group %s (flux job id %d)\n", nodeStr, job.Args.GroupName, job.Args.FluxJob) return nil diff --git a/kubernetes/pkg/scheduler/scheduler.go b/kubernetes/pkg/scheduler/scheduler.go index aa0f0ef..d70f950 100644 --- a/kubernetes/pkg/scheduler/scheduler.go +++ b/kubernetes/pkg/scheduler/scheduler.go @@ -460,9 +460,15 @@ func (sched *Scheduler) Run(ctx context.Context) { // SchedulingQueue, in effect causing a deadlock on shutdown. go wait.UntilWithContext(ctx, sched.ScheduleOne, 0) + // Get a handle to the fluxnetes framework + fwk, ok := sched.Profiles["fluxnetes"] + if !ok { + logger.Error(fmt.Errorf("Missing plugin"), "Cannot find fluxnetes plugin") + } + // This is the only added line to start our queue logger.Info("[FLUXNETES]", "Starting", "queue") - queue, err := fluxnetes.NewQueue(ctx) + queue, err := fluxnetes.NewQueue(ctx, fwk) if err != nil { logger.Error(err, "Issue with Fluxnetes queue") } @@ -472,6 +478,9 @@ func (sched *Scheduler) Run(ctx context.Context) { sched.Queue = queue defer sched.Queue.Pool.Close() + // Get and run the informer (update, delete pod events) + go sched.Queue.GetInformer().Run(ctx.Done()) + // Make an empty state for now, just for functions state := framework.NewCycleState() diff --git a/src/fluxnetes/pkg/fluxion-grpc/fluxion.pb.go b/src/fluxnetes/pkg/fluxion-grpc/fluxion.pb.go index d9cc257..3a87845 100644 --- a/src/fluxnetes/pkg/fluxion-grpc/fluxion.pb.go +++ b/src/fluxnetes/pkg/fluxion-grpc/fluxion.pb.go @@ -334,6 +334,8 @@ type CancelRequest struct { unknownFields protoimpl.UnknownFields FluxID uint64 `protobuf:"varint,1,opt,name=fluxID,proto3" json:"fluxID,omitempty"` + // It's ok if it doesn't exist (don't issue an error) + NoExistOK bool `protobuf:"varint,2,opt,name=NoExistOK,proto3" json:"NoExistOK,omitempty"` } func (x *CancelRequest) Reset() { @@ -375,6 +377,13 @@ func (x *CancelRequest) GetFluxID() uint64 { return 0 } +func (x *CancelRequest) GetNoExistOK() bool { + if x != nil { + return x.NoExistOK + } + return false +} + // The Match response message type CancelResponse struct { state protoimpl.MessageState @@ -663,45 +672,47 @@ var file_fluxnetes_pkg_fluxion_grpc_fluxion_proto_rawDesc = []byte{ 0x61, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65, 0x73, 0x65, 0x72, 0x76, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, - 0x65, 0x64, 0x22, 0x27, 0x0a, 0x0d, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x64, 0x22, 0x45, 0x0a, 0x0d, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x6c, 0x75, 0x78, 0x49, 0x44, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x06, 0x66, 0x6c, 0x75, 0x78, 0x49, 0x44, 0x22, 0x3e, 0x0a, 0x0e, 0x43, - 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, - 0x06, 0x66, 0x6c, 0x75, 0x78, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x66, - 0x6c, 0x75, 0x78, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xe6, 0x01, 0x0a, 0x0a, - 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x70, - 0x75, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x63, 0x70, - 0x75, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x12, 0x1a, 0x0a, 0x08, 0x67, 0x70, 0x75, 0x41, 0x76, 0x61, - 0x69, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x67, 0x70, 0x75, 0x41, 0x76, 0x61, - 0x69, 0x6c, 0x12, 0x22, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x41, 0x76, 0x61, - 0x69, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, - 0x65, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x12, 0x20, 0x0a, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, - 0x41, 0x76, 0x61, 0x69, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x6d, 0x65, 0x6d, - 0x6f, 0x72, 0x79, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x6c, 0x6c, 0x6f, - 0x77, 0x65, 0x64, 0x50, 0x6f, 0x64, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, - 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x64, 0x50, 0x6f, 0x64, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, - 0x64, 0x65, 0x49, 0x50, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, - 0x49, 0x50, 0x12, 0x20, 0x0a, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1e, 0x0a, 0x0a, 0x4a, 0x47, 0x46, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x67, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x03, 0x6a, 0x67, 0x66, 0x22, 0x1f, 0x0a, 0x0b, 0x4a, 0x47, 0x46, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x67, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x03, 0x6a, 0x67, 0x66, 0x32, 0x87, 0x01, 0x0a, 0x0e, 0x46, 0x6c, 0x75, 0x78, 0x69, 0x6f, - 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x38, 0x0a, 0x05, 0x4d, 0x61, 0x74, 0x63, - 0x68, 0x12, 0x15, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x69, 0x6f, 0x6e, 0x2e, 0x4d, 0x61, 0x74, 0x63, - 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x69, - 0x6f, 0x6e, 0x2e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x06, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x12, 0x16, 0x2e, 0x66, - 0x6c, 0x75, 0x78, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x69, 0x6f, 0x6e, 0x2e, 0x43, - 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, - 0x3b, 0x5a, 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, - 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, - 0x67, 0x2f, 0x66, 0x6c, 0x75, 0x78, 0x6e, 0x65, 0x74, 0x65, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, - 0x66, 0x6c, 0x75, 0x78, 0x69, 0x6f, 0x6e, 0x2d, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x01, 0x28, 0x04, 0x52, 0x06, 0x66, 0x6c, 0x75, 0x78, 0x49, 0x44, 0x12, 0x1c, 0x0a, 0x09, 0x4e, + 0x6f, 0x45, 0x78, 0x69, 0x73, 0x74, 0x4f, 0x4b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, + 0x4e, 0x6f, 0x45, 0x78, 0x69, 0x73, 0x74, 0x4f, 0x4b, 0x22, 0x3e, 0x0a, 0x0e, 0x43, 0x61, 0x6e, + 0x63, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x66, + 0x6c, 0x75, 0x78, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x66, 0x6c, 0x75, + 0x78, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xe6, 0x01, 0x0a, 0x0a, 0x4e, 0x6f, + 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x70, 0x75, 0x41, + 0x76, 0x61, 0x69, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x63, 0x70, 0x75, 0x41, + 0x76, 0x61, 0x69, 0x6c, 0x12, 0x1a, 0x0a, 0x08, 0x67, 0x70, 0x75, 0x41, 0x76, 0x61, 0x69, 0x6c, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x67, 0x70, 0x75, 0x41, 0x76, 0x61, 0x69, 0x6c, + 0x12, 0x22, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x41, 0x76, 0x61, 0x69, 0x6c, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x41, + 0x76, 0x61, 0x69, 0x6c, 0x12, 0x20, 0x0a, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x41, 0x76, + 0x61, 0x69, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, + 0x79, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x6c, 0x6c, 0x6f, 0x77, 0x65, + 0x64, 0x50, 0x6f, 0x64, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x6c, 0x6c, + 0x6f, 0x77, 0x65, 0x64, 0x50, 0x6f, 0x64, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65, + 0x49, 0x50, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x50, + 0x12, 0x20, 0x0a, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x22, 0x1e, 0x0a, 0x0a, 0x4a, 0x47, 0x46, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x67, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, + 0x67, 0x66, 0x22, 0x1f, 0x0a, 0x0b, 0x4a, 0x47, 0x46, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x67, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6a, 0x67, 0x66, 0x32, 0x87, 0x01, 0x0a, 0x0e, 0x46, 0x6c, 0x75, 0x78, 0x69, 0x6f, 0x6e, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x38, 0x0a, 0x05, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x12, + 0x15, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x69, 0x6f, 0x6e, 0x2e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x69, 0x6f, 0x6e, + 0x2e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x3b, 0x0a, 0x06, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x12, 0x16, 0x2e, 0x66, 0x6c, 0x75, + 0x78, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x61, 0x6e, + 0x63, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x3b, 0x5a, + 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x67, 0x65, 0x64, 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2f, + 0x66, 0x6c, 0x75, 0x78, 0x6e, 0x65, 0x74, 0x65, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x66, 0x6c, + 0x75, 0x78, 0x69, 0x6f, 0x6e, 0x2d, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/src/fluxnetes/pkg/fluxion-grpc/fluxion.proto b/src/fluxnetes/pkg/fluxion-grpc/fluxion.proto index b2ecdc6..88802fa 100644 --- a/src/fluxnetes/pkg/fluxion-grpc/fluxion.proto +++ b/src/fluxnetes/pkg/fluxion-grpc/fluxion.proto @@ -50,6 +50,8 @@ message MatchResponse { message CancelRequest { uint64 fluxID = 1; + // It's ok if it doesn't exist (don't issue an error) + bool NoExistOK = 2; } // The Match response message diff --git a/src/fluxnetes/pkg/fluxion/fluxion.go b/src/fluxnetes/pkg/fluxion/fluxion.go index d08625a..7a49c97 100644 --- a/src/fluxnetes/pkg/fluxion/fluxion.go +++ b/src/fluxnetes/pkg/fluxion/fluxion.go @@ -45,10 +45,13 @@ func (fluxion *Fluxion) InitFluxion(policy, label string) { } // Cancel wraps the Cancel function of the fluxion go bindings -func (fluxion *Fluxion) Cancel(ctx context.Context, in *pb.CancelRequest) (*pb.CancelResponse, error) { +func (fluxion *Fluxion) Cancel( + ctx context.Context, + in *pb.CancelRequest, +) (*pb.CancelResponse, error) { klog.Infof("[Fluxnetes] received cancel request %v\n", in) - err := fluxion.cli.Cancel(int64(in.FluxID), true) + err := fluxion.cli.Cancel(int64(in.FluxID), in.NoExistOK) if err != nil { return nil, err }