Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispatcher stu #72

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 48 additions & 24 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dispatcher
import (
"context"
"fmt"
"k8s.io/klog/v2"
"time"

scheduling "github.com/kubewharf/godel-scheduler-api/pkg/apis/scheduling/v1alpha1"
Expand All @@ -27,20 +28,6 @@ import (
schedulinginformer "github.com/kubewharf/godel-scheduler-api/pkg/client/informers/externalversions/scheduling/v1alpha1"
nodelister "github.com/kubewharf/godel-scheduler-api/pkg/client/listers/node/v1alpha1"
schedulinglister "github.com/kubewharf/godel-scheduler-api/pkg/client/listers/scheduling/v1alpha1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
schedinformers "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
schedulingv1 "k8s.io/client-go/listers/scheduling/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"

"github.com/kubewharf/godel-scheduler/pkg/dispatcher/internal/queue"
"github.com/kubewharf/godel-scheduler/pkg/dispatcher/internal/store"
"github.com/kubewharf/godel-scheduler/pkg/dispatcher/metrics"
Expand All @@ -53,6 +40,18 @@ import (
podutil "github.com/kubewharf/godel-scheduler/pkg/util/pod"
"github.com/kubewharf/godel-scheduler/pkg/util/tracing"
unitutil "github.com/kubewharf/godel-scheduler/pkg/util/unit"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
schedinformers "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
schedulingv1 "k8s.io/client-go/listers/scheduling/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
)

const (
Expand All @@ -64,6 +63,7 @@ type Dispatcher struct {
client kubernetes.Interface
podLister listerv1.PodLister

// 离线 job 批调度相关
// TODO: move to policy manager
// UnitManager contains pending pods belonging to scheduling units.
UnitInfos queue.UnitInfos
Expand All @@ -77,6 +77,7 @@ type Dispatcher struct {
// SortedPodsQueue stores pending pods that have already be sorted
// based on their configured ordering policy. When dispatching pods to
// scheduler, dispatcher will pop pods from this queue.
// "SortedPodsQueue 存储已经根据其配置的排序策略进行排序的待处理 Pod。在将 Pod 分配给调度器时,调度器将从此队列中弹出 Pod。"
SortedPodsQueue queue.SortedQueue

DispatchInfo store.DispatchInfo
Expand All @@ -96,6 +97,7 @@ type Dispatcher struct {

// SchedulerName here is the higher level scheduler name, which is used to select pods
// that godel schedulers should be responsible for and filter out irrelevant pods.
// SchedulerName 这里是更高层次的调度器名称,用于选择 godel 调度器应该负责的 pods,并过滤掉不相关的 pods。
SchedulerName string

recorder events.EventRecorder
Expand All @@ -116,9 +118,13 @@ func New(
) *Dispatcher {
metrics.Register()

// Scheduler Maintainer: 主要负责对 Scheduler 实例状态进行维护,包括 Scheduler 实例健康状况、负载情况、Partition 节点数等.
// Node Shuffler: 主要负责基于 Scheduler 实例个数,对集群节点进行 Partition 分片。
maintainer := schemaintainer.NewSchedulerMaintainer(crdClient, schedulerInformer.Lister())
shuffler := nodeshuffler.NewNodeShuffler(client, crdClient, nodeInformer.Lister(), nmNodeInformer.Lister(), schedulerInformer.Lister(), maintainer)

// Dispatcher 主要负责应用排队,应用分发,节点分区等工作
// 它主要由几个部分构成:Sorting Policy Manager、Dispatching Policy Manager、Node Shuffler、Scheduler Maintainer 和 Reconciler。
dispatcher := &Dispatcher{
StopEverything: stopCh,
client: client,
Expand All @@ -142,11 +148,12 @@ func New(
recorder: recorder,
}

// 主要负责周期性的检查 Pod、Node、Scheduler、SchedulingUnit 等状态,修正错误状态,查漏补缺。
reconciler := reconciler.NewPodStateReconciler(client, podInformer.Lister(), nodeInformer.Lister(),
schedulerInformer.Lister(), nmNodeInformer.Lister(), schedulerName, dispatcher.DispatchInfo, maintainer)

dispatcher.reconciler = reconciler

// 为 dispatcher 添加各种事件处理函数
AddAllEventHandlers(dispatcher, podInformer, schedulerInformer, nodeInformer, nmNodeInformer, podGroupInformer)
go func() {
<-dispatcher.StopEverything
Expand All @@ -156,29 +163,30 @@ func New(
return dispatcher
}

// todo: event handlers and sort pods
func (d *Dispatcher) Run(ctx context.Context) {
// TODO: move to policy manager
go d.UnitInfos.Run(d.StopEverything)
go d.UnitInfos.Run(d.StopEverything) // 判断 podGroup 中的 pods 是否可以发送,如果可以发送,则添加到 ready queue 中

// TODO: sending sorted pods to scheduler in parallel if necessary
// TODO: adaptive worker threads count
go wait.UntilWithContext(ctx, d.sortedLoop, 0)
go wait.UntilWithContext(ctx, d.sortedLoop, 0) // 遍历 SortedPodsQueue,并起 goroutine dispatch

/*
go wait.UntilWithContext(ctx, d.dispatchLoop, 0)
go wait.UntilWithContext(ctx, d.bindLoop, 0)
*/

go d.maintainer.Run(d.StopEverything)
go d.maintainer.Run(d.StopEverything) // 主要负责对 Scheduler 实例状态进行维护,包括 Scheduler 实例健康状况、负载情况、Partition 节点数等

if utilfeature.DefaultFeatureGate.Enabled(features.DispatcherNodeShuffle) {
go d.shuffler.Run(d.StopEverything)
go d.shuffler.Run(d.StopEverything) // 更新节点和 scheudler 的对应关系,并平衡 Scheduler 实例上节点数量
}

go wait.UntilWithContext(ctx, d.pendingLoop, 0)
go wait.UntilWithContext(ctx, d.pendingUnitPodsLoop, 0)
go wait.UntilWithContext(ctx, d.pendingLoop, 0) // 将 FIFOPendingPodsQueue 的 pod 放到 sorted queue 中
go wait.UntilWithContext(ctx, d.pendingUnitPodsLoop, 0) // 将 UnitInfos 的 Pod 添加到 FIFOPendingPodsQueue(或policy manager) 中。

go d.reconciler.Run(d.StopEverything)
go d.reconciler.Run(d.StopEverything) // 主要负责周期性的检查 Pod、Node、Scheduler、SchedulingUnit 等状态,修正错误状态,查漏补缺。
}

// pendingUnitPodsLoop adds pods belonging to dispatchable units to the policy
Expand Down Expand Up @@ -268,6 +276,7 @@ func (d *Dispatcher) dispatchingPod(ctx context.Context, podInfo *queue.QueuedPo
go span.Finish()
}()

// 为 pod 选择一个 scheduler
schedulerName, err := d.selectScheduler(pod)
if err != nil {
klog.InfoS("Failed to select the scheduler", "err", err)
Expand All @@ -281,6 +290,10 @@ func (d *Dispatcher) dispatchingPod(ctx context.Context, podInfo *queue.QueuedPo
metrics.PodDispatched(helper.SinceInSeconds(start))

start = time.Now()

// patch pod
// "godel.bytedance.com/selected-scheduler": schedulerName
// "godel.bytedance.com/pod-state": "dispatched"
if err := d.sendPodToScheduler(pod, podInfo, schedulerName); err != nil {
// TODO: need to parse error in order to avoid pod ping-pong because of "resource too old" error
klog.InfoS("Failed to send pod to the scheduler", "err", err)
Expand Down Expand Up @@ -351,13 +364,15 @@ func (d *Dispatcher) getAssignedSchedulerFromPods(pg *scheduling.PodGroup) (stri
}

func (d *Dispatcher) getAssignedScheduler(pg *scheduling.PodGroup) (string, error) {
// 从 UnitInfos 中获取 podGroup 的 scheduler
cachedSched := d.UnitInfos.GetAssignedSchedulerForPodGroupUnit(pg)
if cachedSched != "" {
return cachedSched, nil
}

// in case of master/backup switch for HA, we will try to get the assigned
// scheduler name from dispatched/assumed pods.
// list podGroup 下的 pods,从 pod 的 annotation 中获取 scheduler 的名字
schedName, err := d.getAssignedSchedulerFromPods(pg)
if err != nil {
return "", err
Expand All @@ -375,11 +390,14 @@ func (d *Dispatcher) getAssignedScheduler(pg *scheduling.PodGroup) (string, erro
// selectSchedulerForUnit selects a secheduler for the podgroup, if the
// dispatcher has already assigned a scheduler to the podgroup, then returns
// the existing one.
// selectSchedulerForUnit 为 podgroup 选择一个 scheudler,如果调度程序已为 podgroup 分配了 scheduler,则返回现有的 scheduler。
func (d *Dispatcher) selectSchedulerForUnit(pg *scheduling.PodGroup, pod *v1.Pod, podOwner string) (string, error) {
schedName, err := d.getAssignedScheduler(pg)
if err != nil {
return "", err
}

// scheduler name 不为空 && scheduler exist && scheduler is active,返回 scheduler name
if schedName != "" && d.maintainer.SchedulerExist(schedName) && d.maintainer.IsSchedulerInActiveQueue(schedName) {
d.DispatchInfo.AddPodInAdvance(pod, schedName)
if utilfeature.DefaultFeatureGate.Enabled(features.SupportRescheduling) {
Expand All @@ -395,16 +413,20 @@ func (d *Dispatcher) selectSchedulerForUnit(pg *scheduling.PodGroup, pod *v1.Pod
forceUpdate = true
}
klog.V(4).InfoS("Selected a new scheduler for the podGroup", "podGroup", klog.KObj(pg))
// select a scheduler for the first dispatchable pod of the PodGroup.
// select a scheduler for the first dispatchable pod of the PodGroup. PodGroup 的第一个 dispatchable pod 选择 scheduelr。
schedName, err = d.pickScheduler(pod)
if err != nil {
return "", err
}
// store the assigned scheduler in the unit info cache
// store the assigned scheduler in the unit info cache. 将指定的 scheduler 保存在 unit 缓存中,ui.scheduler
err = d.UnitInfos.AssignSchedulerToPodGroupUnit(pg, schedName, forceUpdate)
return schedName, err
}

// 为 pod 选择一个 scheduler
// 1. 获取 podOwner 和 podGroup Name
// 2. podGroup Name 为空,直接按照 loadBalancing 策略选择节点数最少的 scheduler
// 3. 若 podGroup Name 不为空,则
func (d *Dispatcher) selectScheduler(pod *v1.Pod) (string, error) {
podOwner := podutil.GetPodOwner(pod)
pgName := unitutil.GetPodGroupName(pod)
Expand Down Expand Up @@ -462,6 +484,8 @@ func (d *Dispatcher) sendPodToScheduler(pod *v1.Pod, podInfo *queue.QueuedPodInf
if podCopy.Annotations == nil {
podCopy.Annotations = make(map[string]string)
}
// "godel.bytedance.com/selected-scheduler": schedulerName
// "godel.bytedance.com/pod-state": "dispatched"
podCopy.Annotations[podutil.SchedulerAnnotationKey] = schedulerName
podCopy.Annotations[podutil.PodStateAnnotationKey] = string(podutil.PodDispatched)
if _, ok := podCopy.Annotations[podutil.InitialHandledTimestampAnnotationKey]; !ok {
Expand Down
27 changes: 24 additions & 3 deletions pkg/dispatcher/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (d *Dispatcher) addPodToPendingOrSortedQueue(obj interface{}) {

// if the pod is associated to a unit, which is not ready, add
// it to the corresponding pending unit.
// 通过 pod annotation "godel.bytedance.com/pod-group-name" 判断 是否属于 unit,
// 如果属于,则添加到 UnitInfos 的 unSortedPods 中,
// 否则添加到 FIFOPendingPodsQueue 中,直接 diaspatch
if frwkutils.PodBelongToUnit(pod) {
klog.V(5).InfoS("DEBUG: added unsorted pod to unit infos", "pod", klog.KObj(pod))
d.UnitInfos.AddUnSortedPodInfo(generateUnitKeyFromPod(pod), podInfo)
Expand Down Expand Up @@ -233,7 +236,7 @@ func AddAllEventHandlers(
nmNodeInformer nodeinformer.NMNodeInformer,
podGroupInformer schedulinginformer.PodGroupInformer,
) {
// pending pods queue
// pending pods queue 等待调度的 pod 队列
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
Expand All @@ -259,7 +262,7 @@ func AddAllEventHandlers(
},
)

// dispatched pods queue
// dispatched pods queue 已经 dispatch 的 pod 队列
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
Expand All @@ -284,6 +287,7 @@ func AddAllEventHandlers(
},
)

// 重调度相关功能
if utilfeature.DefaultFeatureGate.Enabled(features.SupportRescheduling) {
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
Expand All @@ -310,6 +314,7 @@ func AddAllEventHandlers(
)
}

// 一些异常情况的pod,主要是 annotation 和实际状态不一致的pod
// abnormal state pod queue
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
Expand All @@ -318,6 +323,7 @@ func AddAllEventHandlers(
},
)

// scheduler 实例 event handler
schedulerInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: dispatcher.addScheduler,
Expand All @@ -326,14 +332,20 @@ func AddAllEventHandlers(
},
)

// unit infos
// unit infos: 看着似乎是 job or 批调度 相关的功能
// event add:
// 1. pod annotation "godel.bytedance.com/pod-group-name" 有值,将 pod 加入到 UnitInfos.units.pods 中
// 2. ((podGroup 状态不为 pending && unknown) || pod 数量满足要求),将 podGroup(unit) 中的 pod 整体 dispatch
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: dispatcher.addPodToUnitInfos,
DeleteFunc: dispatcher.deletePodFromUnitInfos,
},
)
// unit infos
// event add:
// 1. 将 podGroup 加入到 UnitInfos.units.podGroup 中
// 2. (podGroup 状态不为 pending && unknown || pod 数量满足要求),将 podGroup(unit) 中的 pod 整体 dispatch
podGroupInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: dispatcher.addPodGroupToUnitInfos,
Expand All @@ -342,6 +354,7 @@ func AddAllEventHandlers(
},
)

// 当 add update delete 节点时,node shuffler 对节点重新进行分区
if utilfeature.DefaultFeatureGate.Enabled(features.DispatcherNodeShuffle) {
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
Expand All @@ -351,6 +364,7 @@ func AddAllEventHandlers(
},
)

// nominated-node 相关功能,用于抢占
nmNodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: dispatcher.addNMNode,
Expand Down Expand Up @@ -413,6 +427,10 @@ func (d *Dispatcher) deleteScheduler(obj interface{}) {

d.DispatchInfo.DeleteScheduler(scheduler.Name)
d.maintainer.DeleteScheduler(scheduler)

// 从 Reconciler 的 store 中获取到该 scheudler 的 pods,
// 1. 加入到 StaleDispatchedPods Queue,有异步协程会把这个 scheduler 对应的 dispatched pod 置为 pending 状态重新 dispatch
// 2. 从 Reconciler 的 store 中删除这些 pod
d.reconciler.DeleteScheduler(scheduler)
}

Expand All @@ -435,6 +453,7 @@ func (d *Dispatcher) deletePodFromUnitInfos(obj interface{}) {
d.UnitInfos.DeletePod(generateUnitKeyFromPod(pod), podutil.GetPodKey(pod))
}

// event: add PodGroup
func (d *Dispatcher) addPodGroupToUnitInfos(obj interface{}) {
pg, ok := obj.(*scheduling.PodGroup)
if !ok {
Expand All @@ -444,6 +463,8 @@ func (d *Dispatcher) addPodGroupToUnitInfos(obj interface{}) {

klog.V(4).InfoS("Started to handle Add event for PodGroup",
"podGroup", klog.KObj(pg))

// UnitInfos 中添加 PodGroup
d.UnitInfos.AddPodGroup(pg)
}

Expand Down
Loading