Skip to content

Commit

Permalink
Merge pull request #44 from googs1025/fix_assert
Browse files Browse the repository at this point in the history
fix: handle pod assert err
  • Loading branch information
NickrenREN authored May 7, 2024
2 parents a738785 + ecd7478 commit 5035528
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 56 deletions.
64 changes: 14 additions & 50 deletions pkg/binder/binder_unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,79 +267,43 @@ func (unitInfo *bindingUnitInfo) GetNewTasksOnNode(nodeName string) []*runningUn
return result
}

func (unitInfo *bindingUnitInfo) GetReadyAndWaitingTasks() []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()

len := len(unitInfo.readyTasks) + len(unitInfo.waitingTasks)
if len == 0 {
func (unitInfo *bindingUnitInfo) getTasks(tasks map[types.UID]*checkResult) []*checkResult {
length := len(tasks)
if length == 0 {
return nil
}

result := make([]*checkResult, len)
result := make([]*checkResult, length)
i := 0
for _, cr := range unitInfo.readyTasks {
result[i] = cr
i++
}
for _, cr := range unitInfo.waitingTasks {
for _, cr := range tasks {
result[i] = cr
i++
}
return result
}

func (unitInfo *bindingUnitInfo) GetReadyTasks() []*checkResult {
func (unitInfo *bindingUnitInfo) GetReadyAndWaitingTasks() []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()
return append(unitInfo.getTasks(unitInfo.readyTasks), unitInfo.getTasks(unitInfo.waitingTasks)...)
}

len := len(unitInfo.readyTasks)
if len == 0 {
return nil
}

result := make([]*checkResult, len)
i := 0
for _, cr := range unitInfo.readyTasks {
result[i] = cr
i++
}
return result
func (unitInfo *bindingUnitInfo) GetReadyTasks() []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()
return unitInfo.getTasks(unitInfo.readyTasks)
}

func (unitInfo *bindingUnitInfo) GetWaitingTasks() []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()

len := len(unitInfo.waitingTasks)
if len == 0 {
return nil
}

result := make([]*checkResult, len)
i := 0
for _, cr := range unitInfo.waitingTasks {
result[i] = cr
i++
}
return result
return unitInfo.getTasks(unitInfo.waitingTasks)
}

func (unitInfo *bindingUnitInfo) GetFailedTasks() []*checkResult {
unitInfo.mu.Lock()
defer unitInfo.mu.Unlock()

len := len(unitInfo.failedTasks)
if len == 0 {
return nil
}
result := make([]*checkResult, len)
i := 0
for _, cr := range unitInfo.failedTasks {
result[i] = cr
i++
}
return result
return unitInfo.getTasks(unitInfo.failedTasks)
}

// check if there is at lease one preemptor in new tasks
Expand Down
4 changes: 2 additions & 2 deletions pkg/binder/queue/binder_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (p *PriorityQueue) updatePodInWaitingList(oldPod, newPod *v1.Pod) error {
p.waitingPodsList[oldPod.UID] = updatePod(oldPodInfo, newPod)
return nil
}
return fmt.Errorf("pod %s/%s not found in waiting list.", oldPod.Namespace, oldPod.Name)
return fmt.Errorf("pod %s/%s not found in waiting list", oldPod.Namespace, oldPod.Name)
}

func (p *PriorityQueue) deletePodInWaitingList(pod *v1.Pod) error {
Expand Down Expand Up @@ -538,7 +538,7 @@ func (p *PriorityQueue) checkWaitingPod() {
}
}

// MakeNextPodFunc returns a function to retrieve the next pod from a given
// MakeNextUnitFunc returns a function to retrieve the next pod from a given
// active queue
func MakeNextUnitFunc(queue BinderQueue) func() *framework.QueuedUnitInfo {
return func() *framework.QueuedUnitInfo {
Expand Down
16 changes: 13 additions & 3 deletions pkg/scheduler/queue/internal_waitingpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,22 @@ func (w *waitingPodsList) removeExistedOrCreateWaitingPods(pod *v1.Pod) *waiting
}

func (w *waitingPodsList) Add(obj interface{}) error {
pod := obj.(*v1.Pod)
pod, ok := obj.(*v1.Pod)
if !ok {
return fmt.Errorf("failed to convert obj %T to *v1.Pod", obj)
}
podInfo := newQueuedPodInfo(pod, w.clock)
wp := w.removeExistedOrCreateWaitingPods(pod)
wp.pods[pod.UID] = podInfo
return w.SubQueue.Add(wp)
}

func (w *waitingPodsList) Update(oldObj, newObj interface{}) error {
oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod)
oldPod, ok1 := oldObj.(*v1.Pod)
newPod, ok2 := newObj.(*v1.Pod)
if !ok1 || !ok2 {
return fmt.Errorf("failed to convert oldObj %T, newObj %T to *v1.Pod", oldObj, newObj)
}
// We assume that the oldPod and the newPod belong to a same unit.
wp := w.removeExistedOrCreateWaitingPods(newPod)
if oldPodInfo, ok := wp.pods[oldPod.UID]; ok {
Expand All @@ -162,7 +169,10 @@ func (w *waitingPodsList) Update(oldObj, newObj interface{}) error {
}

func (w *waitingPodsList) Delete(obj interface{}) error {
pod := obj.(*v1.Pod)
pod, ok := obj.(*v1.Pod)
if !ok {
return fmt.Errorf("failed to convert obj %T to *v1.Pod", obj)
}
wp := w.removeExistedOrCreateWaitingPods(pod)
delete(wp.pods, pod.UID)
if len(wp.pods) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/pod/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func ConvertToPod(obj interface{}) (*v1.Pod, error) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
Expand Down

0 comments on commit 5035528

Please sign in to comment.