From f32a992f0378f732e110279e4d5c9e8e306b7628 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 3 Aug 2024 23:55:54 -0600 Subject: [PATCH] events: add listener for update and delete While we do not act on these yet (need to think through the logic) this adds the informers to the queue so that we eventually can. We will likely/possibly react to update events and deletion events. Signed-off-by: vsoch --- README.md | 2 + kubernetes/pkg/fluxnetes/events.go | 51 +++++++++++++++++++ kubernetes/pkg/fluxnetes/queue.go | 22 +++++++- .../pkg/fluxnetes/strategy/workers/job.go | 4 -- kubernetes/pkg/scheduler/scheduler.go | 11 +++- 5 files changed, 83 insertions(+), 7 deletions(-) create mode 100644 kubernetes/pkg/fluxnetes/events.go diff --git a/README.md b/README.md index 71d7c50..199518a 100644 --- a/README.md +++ b/README.md @@ -191,7 +191,9 @@ SELECT group_name, group_size from pods_provisional; Thinking: +- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer - We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this) +- What should we do if a pod is updated, and the group is removed? - fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead. - more efficient to retrieve podspec from kubernetes instead of putting into database? diff --git a/kubernetes/pkg/fluxnetes/events.go b/kubernetes/pkg/fluxnetes/events.go new file mode 100644 index 0000000..96c0969 --- /dev/null +++ b/kubernetes/pkg/fluxnetes/events.go @@ -0,0 +1,51 @@ +package fluxnetes + +import ( + corev1 "k8s.io/api/core/v1" + klog "k8s.io/klog/v2" +) + +// UpdatePod is called on an update, and the old and new object are presented +func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) { + + pod := oldObj.(*corev1.Pod) + newPod := newObj.(*corev1.Pod) + + // a pod is updated, get the group. TODO: how to handle change in group name? + // groupName := groups.GetPodGroupName(oldPod) + switch pod.Status.Phase { + case corev1.PodPending: + klog.Infof("Received update event 'Pending' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + case corev1.PodRunning: + klog.Infof("Received update event 'Running' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + case corev1.PodSucceeded: + klog.Infof("Received update event 'Succeeded' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + case corev1.PodFailed: + klog.Infof("Received update event 'Failed' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + case corev1.PodUnknown: + klog.Infof("Received update event 'Unknown' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name) + default: + klog.Infof("Received unknown update event %s for pod %s/%s", newPod.Status.Phase, pod.Status.Phase, pod.Namespace, pod.Name) + } +} + +// DeletePod handles the delete event handler +func (q *Queue) DeletePodEvent(podObj interface{}) { + pod := podObj.(*corev1.Pod) + + switch pod.Status.Phase { + case corev1.PodPending: + klog.Infof("Received delete event 'Pending' for pod %s/%s", pod.Namespace, pod.Name) + case corev1.PodRunning: + klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name) + case corev1.PodSucceeded: + klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name) + case corev1.PodFailed: + klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name) + case corev1.PodUnknown: + klog.Infof("Received delete event 'Unknown' for pod %s/%s", pod.Namespace, pod.Name) + default: + klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name) + } + +} diff --git a/kubernetes/pkg/fluxnetes/queue.go b/kubernetes/pkg/fluxnetes/queue.go index 3e0019a..affa822 100644 --- a/kubernetes/pkg/fluxnetes/queue.go +++ b/kubernetes/pkg/fluxnetes/queue.go @@ -12,11 +12,12 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/util/slogutil" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/scheduler/framework" groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy" @@ -33,6 +34,7 @@ type Queue struct { riverClient *river.Client[pgx.Tx] EventChannel *QueueEvent Strategy strategies.QueueStrategy + Handle framework.Handle // IMPORTANT: subscriptions need to use same context // that client submit them uses @@ -55,7 +57,7 @@ type QueueEvent struct { } // NewQueue starts a new queue with a river client -func NewQueue(ctx context.Context) (*Queue, error) { +func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) { dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) if err != nil { return nil, err @@ -105,6 +107,7 @@ func NewQueue(ctx context.Context) (*Queue, error) { Strategy: strategy, Context: ctx, ReservationDepth: depth, + Handle: handle, } queue.setupEvents() return &queue, nil @@ -139,6 +142,21 @@ func (q *Queue) setupEvents() { q.EventChannel = &QueueEvent{Function: trigger, Channel: c} } +// GetInformer returns the pod informer to run as a go routine +func (q *Queue) GetInformer() cache.SharedIndexInformer { + + // Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning. + podsInformer := q.Handle.SharedInformerFactory().Core().V1().Pods().Informer() + podsInformer.AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + // Event handlers to call on update/delete for cleanup + podsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: q.UpdatePodEvent, + DeleteFunc: q.DeletePodEvent, + }) + return podsInformer +} + // Enqueue a new job to the provisional queue // 1. Assemble (discover or define) the group // 2. Add to provisional table diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/job.go b/kubernetes/pkg/fluxnetes/strategy/workers/job.go index 83a4113..48267e1 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/job.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/job.go @@ -161,10 +161,6 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { return err } } - - // Collect rows into single result - // pgx.CollectRows(rows, pgx.RowTo[string]) - // klog.Infof("Values: %s", values) klog.Infof("[JOB-WORKER-COMPLETE] nodes allocated %s for group %s (flux job id %d)\n", nodeStr, job.Args.GroupName, job.Args.FluxJob) return nil diff --git a/kubernetes/pkg/scheduler/scheduler.go b/kubernetes/pkg/scheduler/scheduler.go index aa0f0ef..d70f950 100644 --- a/kubernetes/pkg/scheduler/scheduler.go +++ b/kubernetes/pkg/scheduler/scheduler.go @@ -460,9 +460,15 @@ func (sched *Scheduler) Run(ctx context.Context) { // SchedulingQueue, in effect causing a deadlock on shutdown. go wait.UntilWithContext(ctx, sched.ScheduleOne, 0) + // Get a handle to the fluxnetes framework + fwk, ok := sched.Profiles["fluxnetes"] + if !ok { + logger.Error(fmt.Errorf("Missing plugin"), "Cannot find fluxnetes plugin") + } + // This is the only added line to start our queue logger.Info("[FLUXNETES]", "Starting", "queue") - queue, err := fluxnetes.NewQueue(ctx) + queue, err := fluxnetes.NewQueue(ctx, fwk) if err != nil { logger.Error(err, "Issue with Fluxnetes queue") } @@ -472,6 +478,9 @@ func (sched *Scheduler) Run(ctx context.Context) { sched.Queue = queue defer sched.Queue.Pool.Close() + // Get and run the informer (update, delete pod events) + go sched.Queue.GetInformer().Run(ctx.Done()) + // Make an empty state for now, just for functions state := framework.NewCycleState()