Skip to content

Commit

Permalink
events: add listener for update and delete
Browse files Browse the repository at this point in the history
While we do not act on these yet (need to think through
the logic) this adds the informers to the queue so that
we eventually can. We will likely/possibly
react to update events and deletion events.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Aug 4, 2024
1 parent 915ed2d commit f32a992
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 7 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ SELECT group_name, group_size from pods_provisional;

Thinking:

- Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer
- We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this)
- What should we do if a pod is updated, and the group is removed?
- fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead.
- more efficient to retrieve podspec from kubernetes instead of putting into database?

Expand Down
51 changes: 51 additions & 0 deletions kubernetes/pkg/fluxnetes/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fluxnetes

import (
corev1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)

// UpdatePod is called on an update, and the old and new object are presented
func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) {

pod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)

// a pod is updated, get the group. TODO: how to handle change in group name?
// groupName := groups.GetPodGroupName(oldPod)
switch pod.Status.Phase {
case corev1.PodPending:
klog.Infof("Received update event 'Pending' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodRunning:
klog.Infof("Received update event 'Running' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received update event 'Succeeded' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodFailed:
klog.Infof("Received update event 'Failed' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
case corev1.PodUnknown:
klog.Infof("Received update event 'Unknown' to '%s' for pod %s/%s", newPod.Status.Phase, pod.Namespace, pod.Name)
default:
klog.Infof("Received unknown update event %s for pod %s/%s", newPod.Status.Phase, pod.Status.Phase, pod.Namespace, pod.Name)
}
}

// DeletePod handles the delete event handler
func (q *Queue) DeletePodEvent(podObj interface{}) {
pod := podObj.(*corev1.Pod)

switch pod.Status.Phase {
case corev1.PodPending:
klog.Infof("Received delete event 'Pending' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodRunning:
klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodSucceeded:
klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodFailed:
klog.Infof("Received delete event 'Failed' for pod %s/%s", pod.Namespace, pod.Name)
case corev1.PodUnknown:
klog.Infof("Received delete event 'Unknown' for pod %s/%s", pod.Namespace, pod.Name)
default:
klog.Infof("Received unknown update event %s for pod %s/%s", pod.Status.Phase, pod.Namespace, pod.Name)
}

}
22 changes: 20 additions & 2 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/util/slogutil"
"k8s.io/client-go/tools/cache"

"k8s.io/kubernetes/pkg/scheduler/framework"
groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries"
strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy"
Expand All @@ -33,6 +34,7 @@ type Queue struct {
riverClient *river.Client[pgx.Tx]
EventChannel *QueueEvent
Strategy strategies.QueueStrategy
Handle framework.Handle

// IMPORTANT: subscriptions need to use same context
// that client submit them uses
Expand All @@ -55,7 +57,7 @@ type QueueEvent struct {
}

// NewQueue starts a new queue with a river client
func NewQueue(ctx context.Context) (*Queue, error) {
func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) {
dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
return nil, err
Expand Down Expand Up @@ -105,6 +107,7 @@ func NewQueue(ctx context.Context) (*Queue, error) {
Strategy: strategy,
Context: ctx,
ReservationDepth: depth,
Handle: handle,
}
queue.setupEvents()
return &queue, nil
Expand Down Expand Up @@ -139,6 +142,21 @@ func (q *Queue) setupEvents() {
q.EventChannel = &QueueEvent{Function: trigger, Channel: c}
}

// GetInformer returns the pod informer to run as a go routine
func (q *Queue) GetInformer() cache.SharedIndexInformer {

// Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning.
podsInformer := q.Handle.SharedInformerFactory().Core().V1().Pods().Informer()
podsInformer.AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

// Event handlers to call on update/delete for cleanup
podsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: q.UpdatePodEvent,
DeleteFunc: q.DeletePodEvent,
})
return podsInformer
}

// Enqueue a new job to the provisional queue
// 1. Assemble (discover or define) the group
// 2. Add to provisional table
Expand Down
4 changes: 0 additions & 4 deletions kubernetes/pkg/fluxnetes/strategy/workers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,6 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
return err
}
}

// Collect rows into single result
// pgx.CollectRows(rows, pgx.RowTo[string])
// klog.Infof("Values: %s", values)
klog.Infof("[JOB-WORKER-COMPLETE] nodes allocated %s for group %s (flux job id %d)\n",
nodeStr, job.Args.GroupName, job.Args.FluxJob)
return nil
Expand Down
11 changes: 10 additions & 1 deletion kubernetes/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,15 @@ func (sched *Scheduler) Run(ctx context.Context) {
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.ScheduleOne, 0)

// Get a handle to the fluxnetes framework
fwk, ok := sched.Profiles["fluxnetes"]
if !ok {
logger.Error(fmt.Errorf("Missing plugin"), "Cannot find fluxnetes plugin")
}

// This is the only added line to start our queue
logger.Info("[FLUXNETES]", "Starting", "queue")
queue, err := fluxnetes.NewQueue(ctx)
queue, err := fluxnetes.NewQueue(ctx, fwk)
if err != nil {
logger.Error(err, "Issue with Fluxnetes queue")
}
Expand All @@ -472,6 +478,9 @@ func (sched *Scheduler) Run(ctx context.Context) {
sched.Queue = queue
defer sched.Queue.Pool.Close()

// Get and run the informer (update, delete pod events)
go sched.Queue.GetInformer().Run(ctx.Done())

// Make an empty state for now, just for functions
state := framework.NewCycleState()

Expand Down

0 comments on commit f32a992

Please sign in to comment.