Skip to content

Commit

Permalink
fix: handle pod assert err
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed May 3, 2024
1 parent dcfb068 commit 3a58d0a
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 58 deletions.
71 changes: 19 additions & 52 deletions pkg/binder/binder_unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,79 +267,46 @@ func (unitInfo *bindingUnitInfo) GetNewTasksOnNode(nodeName string) []*runningUn
return result
}

func (unitInfo *bindingUnitInfo) GetReadyAndWaitingTasks() []*checkResult {
func (unitInfo *bindingUnitInfo) getTasks(tasks map[types.UID]*checkResult) []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()

len := len(unitInfo.readyTasks) + len(unitInfo.waitingTasks)
if len == 0 {
length := len(tasks)
if length == 0 {
return nil
}

result := make([]*checkResult, len)
result := make([]*checkResult, length)
i := 0
for _, cr := range unitInfo.readyTasks {
result[i] = cr
i++
}
for _, cr := range unitInfo.waitingTasks {
for _, cr := range tasks {
result[i] = cr
i++
}
return result
}

func (unitInfo *bindingUnitInfo) GetReadyTasks() []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()

len := len(unitInfo.readyTasks)
if len == 0 {
return nil
func (unitInfo *bindingUnitInfo) GetReadyAndWaitingTasks() []*checkResult {
allTasks := make(map[types.UID]*checkResult)
for uid, cr := range unitInfo.readyTasks {
allTasks[uid] = cr
}

result := make([]*checkResult, len)
i := 0
for _, cr := range unitInfo.readyTasks {
result[i] = cr
i++
for uid, cr := range unitInfo.waitingTasks {
allTasks[uid] = cr
}
return result
}

func (unitInfo *bindingUnitInfo) GetWaitingTasks() []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()
return unitInfo.getTasks(allTasks)
}

len := len(unitInfo.waitingTasks)
if len == 0 {
return nil
}
func (unitInfo *bindingUnitInfo) GetReadyTasks() []*checkResult {
return unitInfo.getTasks(unitInfo.readyTasks)
}

result := make([]*checkResult, len)
i := 0
for _, cr := range unitInfo.waitingTasks {
result[i] = cr
i++
}
return result
func (unitInfo *bindingUnitInfo) GetWaitingTasks() []*checkResult {
return unitInfo.getTasks(unitInfo.waitingTasks)
}

func (unitInfo *bindingUnitInfo) GetFailedTasks() []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()

len := len(unitInfo.failedTasks)
if len == 0 {
return nil
}
result := make([]*checkResult, len)
i := 0
for _, cr := range unitInfo.failedTasks {
result[i] = cr
i++
}
return result
return unitInfo.getTasks(unitInfo.failedTasks)
}

// check if there is at lease one preemptor in new tasks
Expand Down
4 changes: 2 additions & 2 deletions pkg/binder/queue/binder_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (p *PriorityQueue) updatePodInWaitingList(oldPod, newPod *v1.Pod) error {
p.waitingPodsList[oldPod.UID] = updatePod(oldPodInfo, newPod)
return nil
}
return fmt.Errorf("pod %s/%s not found in waiting list.", oldPod.Namespace, oldPod.Name)
return fmt.Errorf("pod %s/%s not found in waiting list", oldPod.Namespace, oldPod.Name)
}

func (p *PriorityQueue) deletePodInWaitingList(pod *v1.Pod) error {
Expand Down Expand Up @@ -538,7 +538,7 @@ func (p *PriorityQueue) checkWaitingPod() {
}
}

// MakeNextPodFunc returns a function to retrieve the next pod from a given
// MakeNextUnitFunc returns a function to retrieve the next pod from a given
// active queue
func MakeNextUnitFunc(queue BinderQueue) func() *framework.QueuedUnitInfo {
return func() *framework.QueuedUnitInfo {
Expand Down
16 changes: 13 additions & 3 deletions pkg/scheduler/queue/internal_waitingpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,22 @@ func (w *waitingPodsList) removeExistedOrCreateWaitingPods(pod *v1.Pod) *waiting
}

func (w *waitingPodsList) Add(obj interface{}) error {
pod := obj.(*v1.Pod)
pod, ok := obj.(*v1.Pod)
if !ok {
return fmt.Errorf("failed to convert obj %T to *v1.Pod", obj)
}
podInfo := newQueuedPodInfo(pod, w.clock)
wp := w.removeExistedOrCreateWaitingPods(pod)
wp.pods[pod.UID] = podInfo
return w.SubQueue.Add(wp)
}

func (w *waitingPodsList) Update(oldObj, newObj interface{}) error {
oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod)
oldPod, ok1 := oldObj.(*v1.Pod)
newPod, ok2 := newObj.(*v1.Pod)
if !ok1 || !ok2 {
return fmt.Errorf("failed to convert oldObj %T, newObj %T to *v1.Pod", oldObj, newObj)
}
// We assume that the oldPod and the newPod belong to a same unit.
wp := w.removeExistedOrCreateWaitingPods(newPod)
if oldPodInfo, ok := wp.pods[oldPod.UID]; ok {
Expand All @@ -162,7 +169,10 @@ func (w *waitingPodsList) Update(oldObj, newObj interface{}) error {
}

func (w *waitingPodsList) Delete(obj interface{}) error {
pod := obj.(*v1.Pod)
pod, ok := obj.(*v1.Pod)
if !ok {
return fmt.Errorf("failed to convert obj %T to *v1.Pod", obj)
}
wp := w.removeExistedOrCreateWaitingPods(pod)
delete(wp.pods, pod.UID)
if len(wp.pods) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/pod/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func ConvertToPod(obj interface{}) (*v1.Pod, error) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
Expand Down

0 comments on commit 3a58d0a

Please sign in to comment.