diff --git a/README.md b/README.md index 907c87b..586adb1 100644 --- a/README.md +++ b/README.md @@ -175,29 +175,34 @@ SELECT group_name, group_size from pods_provisional; - [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many. - [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet - [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc. +- [ ] Add back in the created at filter / sort to the queues (removed when was testing / debugging harder stuff) - [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go - Testing: - [ ] need to test duration / completion time works (run job with short duration, should be cancelled/cleaned up) - [ ] spam submission and test reservations (and cancel) - [ ] implement other queue strategies (fcfs and backfill with > 1 reservation depth) - fcfs can work by only adding one job (first in provisional) to the worker queue at once, only when it's empty! lol. -- [ ] create state diagram that shows how stuff works - [ ] Decide what to do on events - currently we delete / cleanup when there is a decided timeout for pod/job - Arguably, we need to respond to these events for services, etc., where a cleanup job is not scheduled. - This means we need a way to issue cancel to fluxion, and for fluxion to distinguish between 404 and another error. -- [ ] When a job is not able to schedule, it should go into a rejected queue, which should finish and return a NOT SCHEDULABLE status. + - we also need to look up jobid (flux) for a pod given deletion so we can issue cancel - [ ] In cleanup we will need to handle [BlockOwnerDeletion](https://github.com/kubernetes/kubernetes/blob/dbc2b0a5c7acc349ea71a14e49913661eaf708d2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L319). I don't yet understand the cases under which this is used, but likely we want to delete the child object and allow the owner to do whatever is the default (create another pod, etc.) Thinking: -- How do we distinguish between a cancel to fluxion (error) vs. error because it was already cancelled? - - How would that happen? - Need to walk through deletion / update process - right now we have cleanup event if there is termination time, otherwise we wait for pod event to informer - We can allow trying to schedule jobs in the future, although I'm not sure about that use case (add label to do this) - What should we do if a pod is updated, and the group is removed? - fluxion is deriving the nodes on its own, but we might get updated nodes from the scheduler. It might be good to think about how to use the fluxion-service container instead. - more efficient to retrieve podspec from kubernetes instead of putting into database? +TODO: + +- test job that has too many resources and won't pass (it should not make it to provisional or pending_queue) + - can we do a satisfies first? + - we probably need a unique on the insert... +- when that works, a pod that is completed / done needs to be removed from pending + ## License HPCIC DevTools is distributed under the terms of the MIT license. diff --git a/chart/crds/topology.node.k8s.io_noderesourcetopologies.yaml b/chart/crds/topology.node.k8s.io_noderesourcetopologies.yaml deleted file mode 120000 index 288bd96..0000000 --- a/chart/crds/topology.node.k8s.io_noderesourcetopologies.yaml +++ /dev/null @@ -1 +0,0 @@ -../noderesourcetopology/crd.yaml \ No newline at end of file diff --git a/docs/README.md b/docs/README.md index 96e99dc..650ec19 100644 --- a/docs/README.md +++ b/docs/README.md @@ -97,6 +97,14 @@ We will eventually have other strategies that allow for more reservations. This - **Default** "workers queue" (is what I call it) is what handles asking fluxion for allocations. This is the main queue. - **Cleanup** "cancel queue" is what handles canceling reservations, and when pods are cancelled (to be implemented) it will handle that as well. It's a different queue (and different workers) so the jobs do not collide. +#### State Diagram + +The following overview and diagrams describe the above components and show basic states. + +![images/state-overview.png](images/state-overview.png) + +![images/state-diagram.png](images/state-diagram.png) + ## Notes diff --git a/docs/images/state-diagram.png b/docs/images/state-diagram.png new file mode 100644 index 0000000..82468eb Binary files /dev/null and b/docs/images/state-diagram.png differ diff --git a/docs/images/state-overview.png b/docs/images/state-overview.png new file mode 100644 index 0000000..f1d0a75 Binary files /dev/null and b/docs/images/state-overview.png differ diff --git a/examples/job-unsatisfiable.yaml b/examples/job-unsatisfiable.yaml new file mode 100644 index 0000000..0ba2639 --- /dev/null +++ b/examples/job-unsatisfiable.yaml @@ -0,0 +1,23 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: job +spec: + completions: 10 + parallelism: 10 + template: + metadata: + labels: + fluxnetes.group-name: job + fluxnetes.group-size: "10" + spec: + schedulerName: fluxnetes + containers: + - name: job + image: busybox + command: [echo, potato] + resources: + requests: + cpu: 20 + restartPolicy: Never + backoffLimit: 4 diff --git a/examples/job.yaml b/examples/job.yaml index 43f2d0e..3a119fc 100644 --- a/examples/job.yaml +++ b/examples/job.yaml @@ -4,9 +4,6 @@ metadata: name: job spec: template: - metadata: - labels: - fluxnetes.group-name: job spec: schedulerName: fluxnetes containers: diff --git a/kubernetes/pkg/fluxnetes/events.go b/kubernetes/pkg/fluxnetes/events.go index d3a909e..75b06f2 100644 --- a/kubernetes/pkg/fluxnetes/events.go +++ b/kubernetes/pkg/fluxnetes/events.go @@ -1,10 +1,18 @@ package fluxnetes import ( + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers" + corev1 "k8s.io/api/core/v1" klog "k8s.io/klog/v2" ) +// Cleanup deletes a pod. It is assumed that it cannot be scheduled +// This means we do not have a flux id to cancel (-1) +func (q Queue) Cleanup(pod *corev1.Pod, podspec, groupName string) error { + return workers.Cleanup(q.Context, podspec, int64(-1), true, groupName) +} + // UpdatePodEvent is called on an update, and the old and new object are presented func (q *Queue) UpdatePodEvent(oldObj, newObj interface{}) { @@ -42,7 +50,8 @@ func (q *Queue) DeletePodEvent(podObj interface{}) { klog.Infof("Received delete event 'Running' for pod %s/%s", pod.Namespace, pod.Name) case corev1.PodSucceeded: klog.Infof("Received delete event 'Succeeded' for pod %s/%s", pod.Namespace, pod.Name) - // TODO insert submit cleanup here - need a way to get the fluxId + // TODO insert submit cleanup here - get the fluxid from pending? + // TODO need to put somewhere to remove from pending // Likely we can keep around the group name and flux id in a database, and get / delete from there. // err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{}) //if err != nil { diff --git a/kubernetes/pkg/fluxnetes/fluxnetes.go b/kubernetes/pkg/fluxnetes/fluxnetes.go index d755ead..2498096 100644 --- a/kubernetes/pkg/fluxnetes/fluxnetes.go +++ b/kubernetes/pkg/fluxnetes/fluxnetes.go @@ -27,7 +27,9 @@ import ( ) var ( - GroupName = "scheduling.x-k8s.io" + CancelledState = "cancelled" + CleanupQueue = "cleanup" + Unsatisfiable = "unsatisfiable" ) // JobResult serializes a result from Fluxnetes in the scheduler back to metadata diff --git a/kubernetes/pkg/fluxnetes/queries/queries.go b/kubernetes/pkg/fluxnetes/queries/queries.go index ad33c88..0c1866f 100644 --- a/kubernetes/pkg/fluxnetes/queries/queries.go +++ b/kubernetes/pkg/fluxnetes/queries/queries.go @@ -1,29 +1,45 @@ package queries -// Queries used by the main queue (and shared across strategies sometimes) const ( - GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1" - GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3" - InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, duration, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6, $7)" - CountPodsQuery = "select count(*) from pods_provisional where group_name=$1" - UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;" + // Used to get the earliest timstamp for the group + GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 and namespace=$2 limit 1;" + + // When we complete a job worker type after a successful MatchAllocate, this is how we send nodes back via an event + UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;" // Reservations AddReservationQuery = "insert into reservations (group_name, flux_id) values ($1, $2);" DeleteReservationsQuery = "truncate reservations; delete from reservations;" GetReservationsQuery = "select (group_name, flux_id) from reservations;" - // This query should achieve the following (but does not work) + // This query should achieve the following // 1. Select groups for which the size >= the number of pods we've seen - // 2. Then get the group_name, group_size, and podspec for each (this goes to scheduler) - // Ensures we are sorting by the timestamp when they were added (should be DESC I think) - RefreshGroupsQuery = "refresh materialized view groups_size;" - SelectGroupsReadyQuery = "select * from pods_provisional join groups_size on pods_provisional.group_name = groups_size.group_name where group_size >= count order by created_at desc;" + // 2. Then get a representative pod to model the resources for the group + // TODO add back created by and then sort by it desc + SelectGroupsAtSizeQuery = "select group_name, group_size, duration, podspec, namespace from groups_provisional where current_size >= group_size;" + SelectRepresentativePodQuery = `select podspec from pods_provisional where group_name = $1 and namespace = $2;` + + // Pending queue - inserted after moving from provisional + InsertIntoPending = "insert into pending_queue (group_name, namespace, group_size) SELECT '%s', '%s', '%d' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM pending_queue WHERE group_name = '%s' and namespace = '%s');" + + // We delete from the provisional tables when a group is added to the work queues (and pending queue, above) + DeleteProvisionalGroupsQuery = "delete from groups_provisional where %s;" + DeleteGroupsQuery = "delete from pods_provisional where %s;" + + // TODO add created_at back + InsertIntoProvisionalQuery = "insert into pods_provisional (podspec, namespace, name, duration, group_name) select '%s', '%s', '%s', %d, '%s' where not exists (select (group_name, name, namespace) from pods_provisional where group_name = '%s' and namespace = '%s' and name = '%s');" + + // Enqueue queries + // 1. Single pods are added to the pods_provisional - this is how we track uniqueness (and eventually will grab all podspecs from here) + // 2. Groups are added to the groups_provisional, and this is where we can easily store a current cound + // Note that we add a current_size of 1 here assuming the first creation is done paired with an existing pod (and then don't need to increment again) + InsertIntoGroupProvisional = "insert into groups_provisional (group_name, namespace, group_size, duration, podspec, current_size) select '%s', '%s', '%d', '%d', '%s', '1' WHERE NOT EXISTS (SELECT (group_name, namespace) FROM groups_provisional WHERE group_name = '%s' and namespace = '%s');" + IncrementGroupProvisional = "update groups_provisional set current_size = current_size + 1 where group_name = '%s' and namespace = '%s';" - // 3. Then delete all from the table - DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');" + // Pending Queue queries + // 3. We always check if a group is in pending before Enqueue, because if so, we aren't allowed to modify / add to the group + IsPendingQuery = "select * from pending_queue where group_name = $1 and namespace = $2;" - // Note that is used to be done with two queries - these are no longer used - SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size, created_at having group_size >= count(*) order by created_at desc;" - SelectGroupsQuery = "select group_name, group_size, podspec, duration from pods_provisional where group_name in ('%s');" + // We remove from pending to allow another group submission of the same name on cleanup + DeleteFromPendingQuery = "delete from pending_queue where group_name=$1 and namespace=$2;" ) diff --git a/kubernetes/pkg/fluxnetes/queue.go b/kubernetes/pkg/fluxnetes/queue.go index affa822..c83765f 100644 --- a/kubernetes/pkg/fluxnetes/queue.go +++ b/kubernetes/pkg/fluxnetes/queue.go @@ -21,6 +21,7 @@ import ( groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/types" ) const ( @@ -58,7 +59,7 @@ type QueueEvent struct { // NewQueue starts a new queue with a river client func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) { - dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) + pool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) if err != nil { return nil, err } @@ -71,7 +72,7 @@ func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) { // Each strategy has its own worker type strategy.AddWorkers(workers) - riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ // Change the verbosity of the logger here Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ @@ -103,7 +104,7 @@ func NewQueue(ctx context.Context, handle framework.Handle) (*Queue, error) { queue := Queue{ riverClient: riverClient, - Pool: dbPool, + Pool: pool, Strategy: strategy, Context: ctx, ReservationDepth: depth, @@ -160,26 +161,26 @@ func (q *Queue) GetInformer() cache.SharedIndexInformer { // Enqueue a new job to the provisional queue // 1. Assemble (discover or define) the group // 2. Add to provisional table -func (q *Queue) Enqueue(pod *corev1.Pod) error { +func (q *Queue) Enqueue(pod *corev1.Pod) (types.EnqueueStatus, error) { // Get the pod name, duration (seconds) and size, first from labels, then defaults groupName := groups.GetPodGroupName(pod) size, err := groups.GetPodGroupSize(pod) if err != nil { - return err + + return types.Unknown, err } // Get the creation timestamp for the group ts, err := q.GetCreationTimestamp(pod, groupName) if err != nil { - return err + return types.Unknown, err } duration, err := groups.GetPodGroupDuration(pod) if err != nil { - return err + return types.Unknown, err } - // Log the namespace/name, group name, and size klog.Infof("Pod %s has Group %s (%d, %d seconds) created at %s", pod.Name, groupName, size, duration, ts) @@ -199,18 +200,21 @@ func (q *Queue) Enqueue(pod *corev1.Pod) error { // This mimics what Kubernetes does. Note that jobs can be sorted // based on the scheduled at time AND priority. func (q *Queue) Schedule() error { - // Queue Strategy "Schedule" moves provional to the worker queue + // Queue Strategy "Schedule" moves provisional to the worker queue // We get them back in a back to schedule + batch, err := q.Strategy.Schedule(q.Context, q.Pool, q.ReservationDepth) if err != nil { return err } - count, err := q.riverClient.InsertMany(q.Context, batch) - if err != nil { - return err + if len(batch) > 0 { + count, err := q.riverClient.InsertMany(q.Context, batch) + if err != nil { + return err + } + klog.Infof("[Fluxnetes] Schedule inserted %d jobs\n", count) } - klog.Infof("[Fluxnetes] Schedule inserted %d jobs\n", count) // Post submit functions return q.Strategy.PostSubmit(q.Context, q.Pool, q.riverClient) @@ -223,8 +227,8 @@ func (q *Queue) GetCreationTimestamp(pod *corev1.Pod, groupName string) (metav1. // First see if we've seen the group before, the creation times are shared across a group ts := metav1.MicroTime{} - // This query will fail if there are no rows (the podGroup is not known) - row := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName) + // This query will fail if there are no rows (the podGroup is not known in the namespace) + row := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName, pod.Namespace) err := row.Scan(&ts) if err == nil { klog.Info("Creation timestamp is", ts) diff --git a/kubernetes/pkg/fluxnetes/strategy/easy.go b/kubernetes/pkg/fluxnetes/strategy/easy.go index 6660c93..f9864de 100644 --- a/kubernetes/pkg/fluxnetes/strategy/easy.go +++ b/kubernetes/pkg/fluxnetes/strategy/easy.go @@ -15,6 +15,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/provisional" work "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/types" ) // Easy with Backfill @@ -59,7 +60,6 @@ func (s EasyBackfill) Schedule( reservationDepth int32, ) ([]river.InsertManyParams, error) { - // TODO move logic from here into provisional pending := provisional.NewProvisionalQueue(pool) // Is this group ready to be scheduled with the addition of this pod? @@ -91,7 +91,7 @@ func (s EasyBackfill) Schedule( } batch = append(batch, args) } - return batch, err + return batch, nil } // PostSubmit does clearing of reservations @@ -142,18 +142,20 @@ func (s EasyBackfill) PostSubmit( } // Insert the cleanup jobs - count, err := riverClient.InsertMany(ctx, batch) - if err != nil { - return err - } - klog.Infof("[easy] post cleanup (cancel) of %d jobs", count) + if len(batch) > 0 { + count, err := riverClient.InsertMany(ctx, batch) + if err != nil { + return err + } + klog.Infof("[easy] post cleanup (cancel) of %d jobs", count) - // Now cleanup! - dRows, err := pool.Query(ctx, queries.DeleteReservationsQuery) - if err != nil { - return err + // Now cleanup! + dRows, err := pool.Query(ctx, queries.DeleteReservationsQuery) + if err != nil { + return err + } + defer dRows.Close() } - defer dRows.Close() return nil } @@ -162,7 +164,7 @@ func (s EasyBackfill) Enqueue( pool *pgxpool.Pool, pod *corev1.Pod, group *groups.PodGroup, -) error { +) (types.EnqueueStatus, error) { pending := provisional.NewProvisionalQueue(pool) return pending.Enqueue(ctx, pod, group) } diff --git a/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go b/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go index 581c4fb..731e742 100644 --- a/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go +++ b/kubernetes/pkg/fluxnetes/strategy/provisional/provisional.go @@ -4,27 +4,34 @@ import ( "context" "encoding/json" "fmt" + "os" "strings" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/jackc/pgx/v5/pgtype" corev1 "k8s.io/api/core/v1" klog "k8s.io/klog/v2" groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/types" ) // Job Database Model we are retrieving for jobs // We will eventually want more than these three type JobModel struct { GroupName string `db:"group_name"` + Namespace string `db:"namespace"` GroupSize int32 `db:"group_size"` - Podspec string `db:"podspec"` Duration int32 `db:"duration"` - // CreatedAt time.Time `db:"created_at"` + Podspec string `db:"podspec"` +} + +// GroupModel provides the group name and namespace for groups at size +type GroupModel struct { + GroupName string `db:"group_name"` + Namespace string `db:"namespace"` } // The provisional queue is a custom queue (to go along with a queue strategy attached @@ -41,79 +48,103 @@ type ProvisionalQueue struct { pool *pgxpool.Pool } -// Enqueue adds a pod to the provisional queue. A pool database connection is required, -// which comes from the main Fluxnetes queue. -func (q *ProvisionalQueue) Enqueue( +// incrementGroupProvisonal adds 1 to the count of the group provisional queue +func incrementGroupProvisional( ctx context.Context, + pool *pgxpool.Pool, pod *corev1.Pod, group *groups.PodGroup, ) error { - // This query will fail if there are no rows (the podGroup is not known) - var groupName, name, namespace string - row := q.pool.QueryRow(context.Background(), queries.GetPodQuery, group.Name, pod.Namespace, pod.Name) - err := row.Scan(groupName, name, namespace) + // Up the size of the group in provisional here + query := fmt.Sprintf(queries.IncrementGroupProvisional, group.Name, pod.Namespace) + klog.Infof("Incrementing group %s by 1 with pod %s", group.Name, pod.Name) + _, err := pool.Exec(ctx, query) + return err + +} - // We didn't find the pod in the table - add it. +// Enqueue adds a pod to the provisional queue, and if not yet added, the group to the group queue. +// provisional queue. A pool database connection is required, which comes from the main Fluxnetes queue. +func (q *ProvisionalQueue) Enqueue( + ctx context.Context, + pod *corev1.Pod, + group *groups.PodGroup, +) (types.EnqueueStatus, error) { + + pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL")) if err != nil { - klog.Errorf("Did not find pod %s in group %s in table", pod.Name, group) + klog.Errorf("Issue creating new pool %s", err) + return types.Unknown, err + } + defer pool.Close() - // Prepare timestamp and podspec for insertion... - ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true} - podspec, err := json.Marshal(pod) - if err != nil { - return err - } - _, err = q.pool.Query(ctx, queries.InsertPodQuery, string(podspec), pod.Namespace, - pod.Name, group.Duration, ts, group.Name, group.Size) + // First check - a pod group in pending is not allowed to enqueue new pods. + // This means the job is submit / running (and not completed + result, err := pool.Exec(context.Background(), queries.IsPendingQuery, group.Name, pod.Namespace) + if err != nil { + klog.Infof("Error checking if pod %s/%s group is in pending queue", pod.Namespace, pod.Name) + return types.Unknown, err + } + if strings.Contains(result.String(), "INSERT 1") { + return types.GroupAlreadyInPending, nil + } - // Show the user a success or an error, we return it either way - if err != nil { - klog.Infof("Error inserting provisional pod %s", err) - } - return err + // Here we add to single pod provisional. + // Prepare timestamp and podspec for insertion... + podspec, err := json.Marshal(pod) + if err != nil { + klog.Infof("Error with pod marshall %s/%s when adding to provisional", pod.Namespace, pod.Name) + return types.PodInvalid, err } - return err -} -// queryGroupsAtSize returns groups that have achieved minimum size -func (q *ProvisionalQueue) queryGroupsAtSize(ctx context.Context, pool *pgxpool.Pool) ([]string, error) { + // Insert or fall back if does not exists to doing nothing + // TODO add back timestamp, and optimize this function to minimize database exec calls + // ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true} + query := fmt.Sprintf(queries.InsertIntoProvisionalQuery, string(podspec), pod.Namespace, pod.Name, group.Duration, group.Name, group.Name, pod.Namespace, pod.Name) + _, err = pool.Exec(context.Background(), query) + if err != nil { + klog.Infof("Error inserting pod %s/%s into provisional queue", pod.Namespace, pod.Name) + return types.Unknown, err + } - // First retrieve the group names that are the right size - rows, err := pool.Query(ctx, queries.SelectGroupsAtSizeQuery) + err = incrementGroupProvisional(context.Background(), pool, pod, group) if err != nil { - return nil, err + klog.Infof("Error incrementing Pod %s/%s", pod.Namespace, pod.Name) + return types.Unknown, err } - defer rows.Close() - // Collect rows into single result - groupNames, err := pgx.CollectRows(rows, pgx.RowTo[string]) - return groupNames, err + // Next add to group provisional - will only add if does not exist, and if so, we make count 1 to + // avoid doing the increment call. + // TODO eventually need to insert timestamp here + query = fmt.Sprintf(queries.InsertIntoGroupProvisional, group.Name, pod.Namespace, group.Size, group.Duration, string(podspec), group.Name, pod.Namespace) + _, err = pool.Exec(ctx, query) + if err != nil { + klog.Infof("Error inserting group into provisional %s", err) + return types.Unknown, err + } + return types.PodEnqueueSuccess, nil } -// queryGroupsAtSize returns groups that have achieved minimum size -func (q *ProvisionalQueue) deleteGroups(ctx context.Context, pool *pgxpool.Pool, groupNames []string) error { +// getReadyGroups gets groups thta are ready for moving from provisional to pending +func (q *ProvisionalQueue) getReadyGroups(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, error) { // First retrieve the group names that are the right size - query := fmt.Sprintf(queries.DeleteGroupsQuery, strings.Join(groupNames, ",")) - rows, err := pool.Query(ctx, query) + rows, err := pool.Query(ctx, queries.SelectGroupsAtSizeQuery) if err != nil { - return err + klog.Infof("GetReadGroups Error: select groups at size: %s", err) + return nil, err } defer rows.Close() - return err -} -// queryGroupsAtSize returns groups that have achieved minimum size -func (q *ProvisionalQueue) getGroupsAtSize(ctx context.Context, pool *pgxpool.Pool, groupNames []string) ([]workers.JobArgs, error) { - - // Now we need to collect all the pods that match that. - query := fmt.Sprintf(queries.SelectGroupsQuery, strings.Join(groupNames, "','")) - rows, err := pool.Query(ctx, query) + models, err := pgx.CollectRows(rows, pgx.RowToStructByName[JobModel]) if err != nil { + klog.Infof("GetReadGroups Error: collect rows for groups at size: %s", err) return nil, err } - defer rows.Close() + + // Get one representative podspec for each + var podspec string // Collect rows into map, and then slice of jobs // The map whittles down the groups into single entries @@ -122,17 +153,24 @@ func (q *ProvisionalQueue) getGroupsAtSize(ctx context.Context, pool *pgxpool.Po lookup := map[string]workers.JobArgs{} // Collect rows into single result - models, err := pgx.CollectRows(rows, pgx.RowToStructByName[JobModel]) - // TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker + // Right now we just select a representative one for the entire group. for _, model := range models { + row := q.pool.QueryRow(ctx, queries.SelectRepresentativePodQuery, string(model.GroupName), string(model.Namespace)) + err = row.Scan(&podspec) + if err != nil { + klog.Errorf("Issue scanning podspec: %s", err) + return nil, err + } + klog.Infof("parsing group %s", model) jobArgs := workers.JobArgs{ GroupName: model.GroupName, - Podspec: model.Podspec, GroupSize: model.GroupSize, Duration: model.Duration, + Podspec: podspec, + Namespace: model.Namespace, } - lookup[model.GroupName] = jobArgs + lookup[model.GroupName+"-"+model.Namespace] = jobArgs } for _, jobArgs := range lookup { jobs = append(jobs, jobArgs) @@ -140,62 +178,85 @@ func (q *ProvisionalQueue) getGroupsAtSize(ctx context.Context, pool *pgxpool.Po return jobs, nil } -// This was an attmpt to combine into one query (does not work, still two!) -func (q *ProvisionalQueue) getGroupsReady(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, []string, error) { +// deleteGroups deletes groups from the provisional table +func (q *ProvisionalQueue) deleteGroups( + ctx context.Context, + pool *pgxpool.Pool, + groups []workers.JobArgs, +) error { - groupNames := []string{} + // select based on group name and namespace, which should be unique + query := "" + for i, group := range groups { + query += fmt.Sprintf("(group_name = '%s' and namespace='%s')", group.GroupName, group.Namespace) + if i < len(groups)-1 { + query += " or " + } + } + klog.Infof("Query is %s", query) - // Refresh groups table - _, err := pool.Query(ctx, queries.RefreshGroupsQuery) + // This deletes from the single pod provisional table + queryProvisional := fmt.Sprintf(queries.DeleteGroupsQuery, query) + _, err := pool.Exec(ctx, queryProvisional) if err != nil { - return nil, groupNames, err + klog.Infof("Error with delete provisional pods %s: %s", query, err) + return err } - // Now we need to collect all the pods that match that. - rows, err := pool.Query(ctx, queries.SelectGroupsReadyQuery) + // This from the grroup + query = fmt.Sprintf(queries.DeleteProvisionalGroupsQuery, query) + _, err = pool.Exec(ctx, query) if err != nil { - return nil, groupNames, err + klog.Infof("Error with delete groups provisional %s: %s", query, err) + return err } - defer rows.Close() - - // Collect rows into map, and then slice of jobs - // The map whittles down the groups into single entries - // We will eventually not want to do that, assuming podspecs are different in a group - jobs := []workers.JobArgs{} - lookup := map[string]workers.JobArgs{} + return err +} - // Collect rows into single result - models, err := pgx.CollectRows(rows, pgx.RowToStructByName[JobModel]) +// Enqueue adds a pod to the provisional queue. A pool database connection is required, +// which comes from the main Fluxnetes queue. +func (q *ProvisionalQueue) insertPending( + ctx context.Context, + pool *pgxpool.Pool, + groups []workers.JobArgs, +) error { - // TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker - for _, model := range models { - jobArgs := workers.JobArgs{GroupName: model.GroupName, Podspec: model.Podspec, GroupSize: model.GroupSize} - lookup[model.GroupName] = jobArgs + // Send in patch + batch := &pgx.Batch{} + for _, group := range groups { + query := fmt.Sprintf(queries.InsertIntoPending, group.GroupName, group.Namespace, group.GroupSize, group.GroupName, group.Namespace) + batch.Queue(query) } - - for _, jobArgs := range lookup { - jobs = append(jobs, jobArgs) - groupNames = append(groupNames, jobArgs.GroupName) + klog.Infof("[Fluxnetes] Inserting %d groups into pending\n", len(groups)) + result := pool.SendBatch(ctx, batch) + err := result.Close() + if err != nil { + klog.Errorf("Error comitting to send %d groups into pending %s", len(groups), err) } - return jobs, groupNames, nil + return err } // ReadyJobs returns jobs that are ready from the provisional table, also cleaning up func (q *ProvisionalQueue) ReadyJobs(ctx context.Context, pool *pgxpool.Pool) ([]workers.JobArgs, error) { // 1. Get the list of group names that have pod count >= their size - groupNames, err := q.queryGroupsAtSize(ctx, pool) + jobs, err := q.getReadyGroups(ctx, pool) if err != nil { return nil, err } - // 2. Now we need to collect all the pods that match that. - jobs, err := q.getGroupsAtSize(ctx, pool, groupNames) - if err != nil { - return nil, err - } + klog.Infof("Found %d ready groups %s", len(jobs), jobs) + if len(jobs) > 0 { - // 3. Finally, we need to delete them from the provisional table - err = q.deleteGroups(ctx, pool, groupNames) + // Move them into pending! We do this first so that we are sure the groups + // are known to be pending before we delete from provisional. + err = q.insertPending(ctx, pool, jobs) + if err != nil { + return nil, err + } + // 3. Finally, we need to delete them from the provisional tables + // If more individual pods are added, they need to be a new group + err = q.deleteGroups(ctx, pool, jobs) + } return jobs, err } diff --git a/kubernetes/pkg/fluxnetes/strategy/strategy.go b/kubernetes/pkg/fluxnetes/strategy/strategy.go index 6ee98b6..94c81f5 100644 --- a/kubernetes/pkg/fluxnetes/strategy/strategy.go +++ b/kubernetes/pkg/fluxnetes/strategy/strategy.go @@ -6,6 +6,7 @@ import ( corev1 "k8s.io/api/core/v1" groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/types" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -24,7 +25,7 @@ type QueueStrategy interface { // provide the entire queue to interact with Schedule(context.Context, *pgxpool.Pool, int32) ([]river.InsertManyParams, error) AddWorkers(*river.Workers) - Enqueue(context.Context, *pgxpool.Pool, *corev1.Pod, *groups.PodGroup) error + Enqueue(context.Context, *pgxpool.Pool, *corev1.Pod, *groups.PodGroup) (types.EnqueueStatus, error) PostSubmit(context.Context, *pgxpool.Pool, *river.Client[pgx.Tx]) error // Return metadata about the strategy for the Queue to know diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go index c9c829b..303e7f8 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/cleanup.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "os" "time" "github.com/jackc/pgx/v5" @@ -19,6 +20,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults" pb "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/fluxion-grpc" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" "github.com/riverqueue/river" ) @@ -77,17 +79,22 @@ func SubmitCleanup( if err != nil { return err } - if err := tx.Commit(ctx); err != nil { + err = tx.Commit(ctx) + if err != nil { return err } - klog.Infof("SUBMIT CLEANUP ending for %d", fluxID) + if fluxID < 0 { + klog.Infof("SUBMIT CLEANUP ending for unschedulable job") + } else { + klog.Infof("SUBMIT CLEANUP ending for %d", fluxID) + } return nil } // deleteObjects cleans up (deletes) Kubernetes objects // We do this before the call to fluxion so we can be sure the // cluster object resources are freed first -func deleteObjects(ctx context.Context, job *river.Job[CleanupArgs]) error { +func deleteObjects(ctx context.Context, podspec string) error { config, err := rest.InClusterConfig() if err != nil { return err @@ -99,7 +106,7 @@ func deleteObjects(ctx context.Context, job *river.Job[CleanupArgs]) error { // Serialize the podspec back to a pod var pod corev1.Pod - err = json.Unmarshal([]byte(job.Args.Podspec), &pod) + err = json.Unmarshal([]byte(podspec), &pod) if err != nil { return err } @@ -143,11 +150,26 @@ func deleteJob(ctx context.Context, namespace string, client kubernetes.Interfac // Work performs the Cancel action, first cancelling in Kubernetes (if needed) // and then cancelling in fluxion. func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) error { - klog.Infof("[CLEANUP-WORKER-START] Cleanup (cancel) running for jobid %s", job.Args.FluxID) + + // Wrapper to actual cleanup function that can be called from elsewhere + return Cleanup(ctx, job.Args.Podspec, job.Args.FluxID, job.Args.Kubernetes, job.Args.GroupName) +} + +// Cleanup handles a call to fluxion to cancel (if appropriate) along with Kubernetes object deletion, +// and finally, deletion from Pending queue (table) to allow new jobs in +func Cleanup( + ctx context.Context, + podspec string, + fluxID int64, + inKubernetes bool, + groupName string, +) error { + + klog.Infof("[CLEANUP-START] Cleanup (cancel) running for jobid %s", fluxID) // First attempt cleanup in the cluster, only if in Kubernetes - if job.Args.Kubernetes { - err := deleteObjects(ctx, job) + if inKubernetes { + err := deleteObjects(ctx, podspec) // The job might have been deleted another way if err != nil && !errors.IsNotFound(err) { @@ -155,6 +177,47 @@ func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) er } } + // We only delete from fluxion if there is a flux id + // A valid fluxID is 0 or greater + var err error + if fluxID > -1 { + err = deleteFluxion(fluxID) + if err != nil { + klog.Infof("Error issuing cancel to fluxion for group %s/%s ", groupName) + } + return err + } + + // Serialize the podspec back to a pod + var pod corev1.Pod + err = json.Unmarshal([]byte(podspec), &pod) + if err != nil { + return err + } + + // Next, delete from the pending table to new pods with same group + // TODO should we allow this to continue? + pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL")) + if err != nil { + klog.Errorf("Issue creating new pool %s", err) + return err + } + defer pool.Close() + + // First check - a pod group in pending is not allowed to enqueue new pods. + // This means the job is submit / running (and not completed + _, err = pool.Exec(context.Background(), queries.DeleteFromPendingQuery, groupName, pod.Namespace) + if err != nil { + klog.Infof("Error deleting Pod %s/%s from pending queue", pod.Namespace, pod.Name) + return err + } + klog.Infof("[CLEANUP-COMPLETE] for group %s (flux job id %d)", groupName, fluxID) + return nil +} + +// deleteFluxion issues a cancel to Fluxion, our scheduler +func deleteFluxion(fluxID int64) error { + // Connect to the Fluxion service. Returning an error means we retry // see: https://riverqueue.com/docs/job-retries conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) @@ -172,7 +235,7 @@ func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) er // Prepare the request to cancel // https://github.com/flux-framework/flux-sched/blob/master/resource/reapi/bindings/go/src/fluxcli/reapi_cli.go#L226 request := &pb.CancelRequest{ - FluxID: uint64(job.Args.FluxID), + FluxID: uint64(fluxID), // Don't return an error if the job id does not exist. See: NoExistOK: true, @@ -184,9 +247,6 @@ func (w CleanupWorker) Work(ctx context.Context, job *river.Job[CleanupArgs]) er response, err := fluxion.Cancel(fluxionCtx, request) if err != nil { klog.Errorf("[Fluxnetes] Issue with cancel %s %s", response.Error, err) - return err } - klog.Infof("[CLEANUP-WORKER-COMPLETE] for group %s (flux job id %d)", - job.Args.GroupName, job.Args.FluxID) - return nil + return err } diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/job.go b/kubernetes/pkg/fluxnetes/strategy/workers/job.go index 48267e1..e64bb35 100644 --- a/kubernetes/pkg/fluxnetes/strategy/workers/job.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/job.go @@ -37,6 +37,7 @@ type JobArgs struct { GroupName string `json:"groupName"` GroupSize int32 `json:"groupSize"` Duration int32 `json:"duration"` + Namespace string `json:"namespace"` // If true, we are allowed to ask Fluxion for a reservation Reservation bool `json:"reservation"` @@ -128,6 +129,12 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { defer rRows.Close() } + // Not reserved AND not allocated indicates not possible + if !response.Reserved && !response.Allocated { + errorMessage := fmt.Sprintf("Fluxion could not allocate nodes for %s, likely Unsatisfiable", job.Args.GroupName) + klog.Info(errorMessage) + return river.JobCancel(fmt.Errorf(errorMessage)) + } // This means we didn't get an allocation - we might have a reservation (to do // something with later) but for now we just print it. if !response.Allocated { @@ -155,6 +162,8 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { // Kick off a cleaning job for when everyting should be cancelled, but only if // there is a deadline set. We can't set a deadline for services, etc. + // This is here instead of responding to deletion / termination since a job might + // run longer than the duration it is allowed. if job.Args.Duration > 0 { err = SubmitCleanup(ctx, pool, pod.Spec.ActiveDeadlineSeconds, job.Args.Podspec, int64(fluxID), true, []string{}) if err != nil { diff --git a/kubernetes/pkg/fluxnetes/types/types.go b/kubernetes/pkg/fluxnetes/types/types.go new file mode 100644 index 0000000..8dbde65 --- /dev/null +++ b/kubernetes/pkg/fluxnetes/types/types.go @@ -0,0 +1,20 @@ +package types + +// EnqueueStatus is returned by the provisional enqueue to provide context +// to the calling queue about what action to take +type EnqueueStatus int + +const ( + // If a pod is already in provisional, group provisional, and pending + PodEnqueueSuccess EnqueueStatus = iota + 1 + + // The pod has already been moved into pending (and submit, but not complete) + // and we do not accept new pods for the group + GroupAlreadyInPending + + // The pod is invalid (podspec cannot serialize, etc) and should be discarded + PodInvalid + + // Unknown means some other error happened (usually not related to pod) + Unknown +) diff --git a/kubernetes/pkg/scheduler/schedule_one.go b/kubernetes/pkg/scheduler/schedule_one.go index 532a19d..b134b48 100644 --- a/kubernetes/pkg/scheduler/schedule_one.go +++ b/kubernetes/pkg/scheduler/schedule_one.go @@ -38,7 +38,9 @@ import ( "k8s.io/kubernetes/pkg/apis/core/validation" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/types" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" utiltrace "k8s.io/utils/trace" @@ -67,39 +69,75 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { logger := klog.FromContext(ctx) podInfo, err := sched.NextPod(logger) if err != nil { + klog.Infof("Error getting next pod from queue %s", err) logger.Error(err, "Error while retrieving next pod from scheduling queue") return } // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { + klog.Info("Podinfo is null") return } pod := podInfo.Pod + //assumedPodInfo := podInfo.DeepCopy() + // TODO(knelasevero): Remove duplicated keys from log entry calls // When contextualized logging hits GA // https://github.com/kubernetes/kubernetes/issues/111672 logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) ctx = klog.NewContext(ctx, logger) - logger.V(1).Info("This is a custom message for fluxnetes", "pod", klog.KObj(pod)) - logger.V(4).Info("About to try and schedule pod", "pod", klog.KObj(pod)) + klog.Infof("Getting framework for pod %s/%s", pod.Namespace, pod.Name) fwk, err := sched.frameworkForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods // which specify a scheduler name that matches one of the profiles. + klog.Infof("Error getting framework for pod %s/%s", pod.Namespace, pod.Name) logger.Error(err, "Error occurred") return } + klog.Infof("Skipping pod schedule of %s/%s", pod.Namespace, pod.Name) if sched.skipPodSchedule(ctx, fwk, pod) { return } // Add the pod to the provisional queue - err = sched.Queue.Enqueue(pod) + klog.Infof("Running enqueue for pod %s/%s", pod.Namespace, pod.Name) + // start := time.Now() + enqueueStatus, err := sched.Queue.Enqueue(pod) if err != nil { + klog.Infof("Enqueue for pod %s/%s was NOT successful: %s", pod.Namespace, pod.Name, err) logger.Error(err, "Issue with fluxnetes Enqueue") } + klog.Infof("Enqueue for pod %s/%s was successful", pod.Namespace, pod.Name) + + // If we cannot schedule "unsatisfiable" we delete + deletePod := false + + // If the group is already in pending we reject it. We do not + // currently support expanding groups that are undergoing processing, unless + // it is an explicit update to an object (TBA). + if enqueueStatus == types.GroupAlreadyInPending { + klog.Infof("Pod %s/%s has group already in pending queue, rejecting.", pod.Namespace, pod.Name) + // status := framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod group is actively in pending and cannot be changed") + // sched.FailureHandler(ctx, fwk, assumedPodInfo, status, clearNominatedNode, start) + deletePod = true + + } else if enqueueStatus == types.PodInvalid { + klog.Infof("Pod %s/%s is invalid or erroneous, rejecting.", pod.Namespace, pod.Name) + // status := framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod is invalid or unable to be scheduled") + // sched.FailureHandler(ctx, fwk, assumedPodInfo, status, clearNominatedNode, start) + deletePod = true + + } else if enqueueStatus == types.PodEnqueueSuccess { + // TODO but this should only happen once. + klog.Infof("Pod %s/%s was added to the provisional table", pod.Namespace, pod.Name) + + // This is usually a database error or similar + } else if enqueueStatus == types.Unknown { + klog.Infof("There was an unknown error for pod %s/%s and we should have gotten err and not get here.", pod.Namespace, pod.Name) + } // TODO(vsoch): the schedulingCycle should be run here, and we should save everything we need for either // the bindingCycle directly, OR information to pass to fluxion (nodes to skip, resources needed, etc) @@ -111,7 +149,16 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { logger.Error(err, "Issue with fluxnetes Schedule") } - // TODO remove pod from active queue, we have in Fluxernetes provisional queue now + // Remove from scheduling queue so it does not come around again + // TODO(vsoch) need kubectl command again to list groups! + sched.SchedulingQueue.Done(pod.UID) + if deletePod { + klog.Infof("Cannot schedule pod, removing from queue.") + err = sched.SchedulingQueue.Delete(pod) + if err != nil { + logger.Error(err, "Issue deleting pod from queues") + } + } } var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""} @@ -172,7 +219,7 @@ func (sched *Scheduler) schedulingCycle( metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) - // NOTE: when refactored, this needs to be given the node from fluence + // NOTE: when refactored, this needs to be given the node from fluxion // Right now we get a message about mismatch between scheduled and assumed // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. diff --git a/kubernetes/pkg/scheduler/scheduler.go b/kubernetes/pkg/scheduler/scheduler.go index d70f950..9d1fc08 100644 --- a/kubernetes/pkg/scheduler/scheduler.go +++ b/kubernetes/pkg/scheduler/scheduler.go @@ -479,14 +479,12 @@ func (sched *Scheduler) Run(ctx context.Context) { defer sched.Queue.Pool.Close() // Get and run the informer (update, delete pod events) - go sched.Queue.GetInformer().Run(ctx.Done()) + //go sched.Queue.GetInformer().Run(ctx.Done()) // Make an empty state for now, just for functions state := framework.NewCycleState() - // TODO try putting this into scheduleOne context to see if events not dropped? - // Setup a function to fire when a job event happens - // We probably want to add sched functions here + // TODO(vsoch): better organize logic in here once more final // Note that we can see queue stats here too: // https://github.com/riverqueue/river/blob/master/event.go#L67-L71 waitForJob := func(subscribeChan <-chan *river.Event) { @@ -502,15 +500,16 @@ func (sched *Scheduler) Run(ctx context.Context) { // The function is added (to work on next) but not yet implemented how // that is going to work klog.Infof("Job Event Received: %s", event.Job.Kind) - if event.Job.Kind == "cleanup" { + if event.Job.Kind == fluxnetes.CleanupQueue { continue } - // TODO: possibly filter to queue name or similar - // https://github.com/riverqueue/river/blob/master/riverdriver/riverpgxv5/migration/main/002_initial_schema.up.sql#L1-L9 // Parse event result into type args := fluxnetes.JobResult{} + // TODO(vsoch): if we care about this, get from original schedule + start := time.Now() + // We only care about job results to further process (not cleanup) err = json.Unmarshal(event.Job.EncodedArgs[:], &args) if err != nil { @@ -518,10 +517,34 @@ func (sched *Scheduler) Run(ctx context.Context) { } nodes := args.GetNodes() + // A cancel means we cannot satisfy, handle the failure + if event.Job.State == "cancelled" { + + // TODO need to cancel the rest in the group? + klog.Infof("Pod CANCEL cannot be scheduled.") + var pod v1.Pod + err := json.Unmarshal([]byte(args.PodSpec), &pod) + if err != nil { + klog.Errorf("Podspec unmarshall error", err) + } + klog.Infof("Cannot schedule pod %s/%s removing from queue.", pod.Namespace, pod.Name) + err = sched.SchedulingQueue.Delete(&pod) + if err != nil { + logger.Error(err, "Deleting pod from queues") + } + + // Remove from cluster + // TODO add GroupName to JobResult if we want to print it here + sched.Queue.Cleanup(&pod, args.PodSpec, "") + return + } + + // NOTE: fluxion current just gives back nodes, and then tasks. + // This means that we need to get the pods for the group, and then + // assign based on tasks. We could also honor the assignment per + // podspec, and I need to think of how to do that. TBA if len(nodes) > 0 { - // TODO(vsoch): if we care about this, get from original schedule - start := time.Now() podsToActivate := framework.NewPodsToActivate() klog.Infof("Got job with state %s and nodes: %s\n", event.Job.State, nodes) diff --git a/src/build/postgres/create-tables.sql b/src/build/postgres/create-tables.sql index ecfd69a..2a4f42c 100644 --- a/src/build/postgres/create-tables.sql +++ b/src/build/postgres/create-tables.sql @@ -4,15 +4,34 @@ CREATE TABLE pods_provisional ( name TEXT NOT NULL, duration INTEGER NOT NULL, created_at timestamptz NOT NULL default NOW(), + group_name TEXT NOT NULL +); +CREATE UNIQUE INDEX group_name_index ON pods_provisional (group_name, namespace, name); + +-- A single row for each group +CREATE TABLE groups_provisional ( + podspec TEXT NOT NULL, + namespace TEXT NOT NULL, + duration INTEGER NOT NULL, + created_at timestamptz NOT NULL default NOW(), group_name TEXT NOT NULL, - group_size INTEGER NOT NULL + group_size INTEGER NOT NULL, + current_size INTEGER NOT NULL ); -CREATE INDEX group_name_index ON pods_provisional (group_name); +CREATE UNIQUE INDEX groups_provisional_index ON groups_provisional (group_name, namespace); --- REFRESH MATERIALIZED VIEW group_sizes; -CREATE materialized view groups_size as SELECT count(*), group_name FROM pods_provisional group by group_name; -- We only need the fluxid for a reservation CREATE TABLE reservations ( group_name TEXT NOT NULL, flux_id INTEGER NOT NULL ); +-- Pods get moved from provisional to pending as group objects +-- The pending queue includes states pending (still waiting to run), +CREATE TABLE pending_queue ( + group_name TEXT NOT NULL, + namespace TEXT NOT NULL, + group_size INTEGER NOT NULL, + flux_id INTEGER +); + -- Don't allow inserting the same group name / namespace stwice +CREATE UNIQUE INDEX pending_key ON pending_queue(group_name, namespace); \ No newline at end of file diff --git a/src/fluxnetes/cmd/main.go b/src/fluxnetes/cmd/main.go index 0a875ae..aba7463 100644 --- a/src/fluxnetes/cmd/main.go +++ b/src/fluxnetes/cmd/main.go @@ -68,9 +68,11 @@ func main() { } fmt.Printf("[GRPCServer] gRPC Listening on %s\n", lis.Addr().String()) - if err := server.Serve(lis); err != nil { + err = server.Serve(lis) + if err != nil { fmt.Printf("[GRPCServer] failed to serve: %v\n", err) } + flux.Close() fmt.Printf("[GRPCServer] Exiting\n") } diff --git a/src/fluxnetes/pkg/fluxion/fluxion.go b/src/fluxnetes/pkg/fluxion/fluxion.go index 7a49c97..eb13188 100644 --- a/src/fluxnetes/pkg/fluxion/fluxion.go +++ b/src/fluxnetes/pkg/fluxion/fluxion.go @@ -44,6 +44,11 @@ func (fluxion *Fluxion) InitFluxion(policy, label string) { fluxion.cli.InitContext(string(jgf), p) } +// Destroys properly closes (destroys) the fluxion client handle +func (fluxion *Fluxion) Close() { + fluxion.cli.Destroy() +} + // Cancel wraps the Cancel function of the fluxion go bindings func (fluxion *Fluxion) Cancel( ctx context.Context,