文章目錄
- Pod調度
- 實現原理
- 調度隊列
- 優先隊列
- 底層數據
- 調度緩存
- 調度框架
Pod調度
Pod調度: 通過污點、容忍度和親和性影響Pod的調度
- 調度器實現, 其基于配置器構造(其配置來源于配置API)
- 調度過程中任何插件返回拒絕, 都會導致Pod可能再次返回調度隊列
如: Pod調度簡略流程
調度執行流程:
- 通過
SharedIndedInformer
過濾未調度的Pod放入調度隊列 - 通過
SharedIndexInformer
過濾已調度的Pod更新調度緩存 - 從調度隊列中取出個Pod,通過其
SchedulerName
執行特定調度框架 - 調度框架觸發調度算法利用調度緩存為Pod選擇最優的Node進行異步綁定
實現原理
調度器(Scheduler): Pod調度決策
- 調度過程的數據依賴于瞬間的緩存
- Pod調度完成后需等待調度插件的批準才可執行綁定
- 基于模塊: 調度隊列、調度緩存、調度框架、調度算法
以下源碼分析均都基于v1.28.1
版本的Kubernetes源碼
// https://github1s.com/kubernetes/kubernetes/blob/HEAD/pkg/scheduler/scheduler.go#64// Scheduler 監視未調度的Pod并嘗試找到適合的Node, 并將綁定信息寫回到API Server
type Scheduler struct {// Cache 調度緩存, 緩存所有的Node狀態// 每次調度前都需更新快照, 以便調度算法使用Cache internalcache.Cache// Extenders 調度插件Extenders []framework.Extender// NextPod 獲取下個要調度的Pod, 沒有則阻塞goroutine// 不能直接從調度隊列中獲取調度的Pod, 其不能日志記錄調度PodNextPod func() (*framework.QueuedPodInfo, error)// FailureHandler 調度時的回調錯誤函數FailureHandler FailureHandlerFn// SchedulePod 調度算法給出Pod可調度的NodeSchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)// StopEverything 關閉Scheduler的信號StopEverything <-chan struct{}// SchedulingQueue 緩存等待調度的PodSchedulingQueue internalqueue.SchedulingQueue// Profiles 調度框架的配置(Profile和Frameword屬于1:1)Profiles profile.Mapclient clientset.InterfacenodeInfoSnapshot *internalcache.SnapshotpercentageOfNodesToScore int32nextStartNodeIndex intlogger klog.LoggerregisteredHandlers []cache.ResourceEventHandlerRegistration
}// New 調度器構造函數
func New(ctx context.Context,client clientset.Interface,informerFactory informers.SharedInformerFactory,dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,recorderFactory profile.RecorderFactory,opts ...Option) (*Scheduler, error) {// Kubernetes內部記錄器, 并獲取終止信號logger := klog.FromContext(ctx)stopEverything := ctx.Done()// 在默認的schedulerOptions基礎上應用所有的optsoptions := defaultSchedulerOptionsfor _, opt := range opts {opt(&options)}// 是否應用默認調度框架配置if options.applyDefaultProfile {var versionedCfg configv1.KubeSchedulerConfigurationscheme.Scheme.Default(&versionedCfg)cfg := schedulerapi.KubeSchedulerConfiguration{}if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {return nil, err}options.profiles = cfg.Profiles}// 創建InTree插件Factory注冊表, 并與OutTree插件Factory注冊表合并以形成插件Factory注冊表// 數據類型為(插件Factory就是插件的構造函數): map[插件名稱]插件Factoryregistry := frameworkplugins.NewInTreeRegistry()if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {return nil, err}// 注冊調度器的度量標準metrics.Register()// 調度器的擴展程序extenders, err := buildExtenders(logger, options.extenders, options.profiles)if err != nil {return nil, fmt.Errorf("couldn't build extenders: %w", err)}// 獲取Pod和Node數據// 以便做成快照數據, 提供調度依據podLister := informerFactory.Core().V1().Pods().Lister()nodeLister := informerFactory.Core().V1().Nodes().Lister()// 初始化快照數據和度量指標記錄器(異步)snapshot := internalcache.NewEmptySnapshot()metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)// 初始化調度器的配置文件profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),frameworkruntime.WithClientSet(client),frameworkruntime.WithKubeConfig(options.kubeConfig),frameworkruntime.WithInformerFactory(informerFactory),frameworkruntime.WithSnapshotSharedLister(snapshot),frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),frameworkruntime.WithParallelism(int(options.parallelism)),frameworkruntime.WithExtenders(extenders),frameworkruntime.WithMetricsRecorder(metricsRecorder),)if err != nil {return nil, fmt.Errorf("initializing profiles: %v", err)}if len(profiles) == 0 {return nil, errors.New("at least one profile is required")}// 配置中添加PreEnqueue和Queueing Hint// PreEnqueue用于在Pod加入調度隊列的前置操作// Queueing Hint過濾事件, 防止調度Pod時的無用重試 preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)for profileName, profile := range profiles {preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())}// 創建調度隊列podQueue := internalqueue.NewSchedulingQueue(profiles[options.profiles[0].SchedulerName].QueueSortFunc(),informerFactory,internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),internalqueue.WithPodLister(podLister),internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),internalqueue.WithMetricsRecorder(*metricsRecorder),)// 假定調度指向該調度器的調度隊列for _, fwk := range profiles {fwk.SetPodNominator(podQueue)}// 調度緩存// durationToExpireAssumedPod 代表綁定的TTL// 若該時間內未完成綁定, 則從調度緩存中移除該假定調度PodschedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)// 調度器的Debuggerdebugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)debugger.ListenForSignal(ctx)// 創建調度器, 并從調度隊列中彈出個Pod開始調度// 并指定調度器默認的調度流程和調度失敗的回調函數sched := &Scheduler{Cache: schedulerCache,client: client,nodeInfoSnapshot: snapshot,percentageOfNodesToScore: options.percentageOfNodesToScore,Extenders: extenders,StopEverything: stopEverything,SchedulingQueue: podQueue,Profiles: profiles,logger: logger,}sched.NextPod = podQueue.Popsched.applyDefaultHandlers()// 注冊事件處理函數// 新建Pod先放入調度隊列, 綁定成功后由該函數更新調度緩存以確認// 本質: 通過SharedIndexInformer監控Pod和NService等調度依賴的資源, 并根據事件執行對應操作if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {return nil, fmt.Errorf("adding event handlers: %w", err)}return sched, nil
}
調度器執行流程如下(主要通過scheduleOne()
方法實現):
- 調度器從調度隊列中獲取個Pod
- 先通過
NextPod()
方法從調度隊列中獲取Pod(可記錄日志) - 調度器會循環從調度隊列中獲取Pod, 沒有待調度的Pod就直接返回
- 先通過
- 對獲取的Pod做調度前的預處理
- 獲取Pod指定的調度框架(
spec.SchedulerName
) - 配置Pod調度的環境(計時、
CycleState
和schedulingCycleCtx
等)
- 獲取Pod指定的調度框架(
- 判斷是否忽略Pod的調度
- Pod已被刪除, 則直接忽略
- Pod被更新, 但已調度/假定調度(根據Pod的更新決定是否重新調度)
- 通過
Score
匹配最優NodeScore
會基于多種因素計算出最適合Pod的Node- 若沒有Node能滿足Pod的資源需求, 則Pod通過
PostFilter
進行搶占式調度 - 若Pod是搶占式調度的, 則Pod當前依然是不可調度的(需等待被搶占的Pod優雅退出)
- 記錄所有調度失敗的原因(
FailureHandler()
)- 記錄導致Pod調度失敗的事件(
kubectl describe pod
命令時的信息) - 從
SharedIndexInformer
緩存中獲取Pod最新狀態, 決定是否將調度失敗的Pod再次放回調度隊列
- 記錄導致Pod調度失敗的事件(
- 假定調度Pod(調度器無需等待可立刻調度下個Pod)
- 調度緩存中更新Pod已綁定匹配的Node
- 若TTL內未綁定成功, 則判定假定調度失敗并從調度緩存中刪除綁定
- 為Pod預留全局資源
- 若預留資源失敗, 則刪除已預留的資源和調度緩存中假定調度信息并記錄失敗信息
- 判定Pod是否可進入綁定周期
- 需等待所有插件批準才可執行綁定
- 若未批準會執行: 刪除預留資源、刪除假定調度Pod、記錄失敗信息
- Pod綁定Node, 異步執行
- 綁定預處理(按順序執行調度框架的各個插件)
- 執行綁定, 向API Server寫入信息(先
Extender
后Bind
, 因部分資源只有前者可管理) - 若綁定成功, 則通知調度緩存并記錄綁定成功事件(綁定失敗也會記錄事件)
- 若綁定失敗會執行: 刪除預留資源、刪除假定調度Pod、記錄失敗信息
如: 調度整體流程
調度隊列
調度隊列(SchedulingQueue): Pod調度過程中的Pod獲取順序
- 調度隊列具有冪等性(每次操作前均判斷是否已存在)
- 調度隊列的實現是個優先隊列(由傳入的函數決定其優先級)
優先隊列
優先隊列(PriorityQueue): 基于map以優先級方式實現調度隊列
- 優先隊列由三個子隊列構成: 就緒隊列、不可調度隊列、退避隊列
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L150
// 實現SchedulingQueue接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L90// PriorityQueue 優先級方式實現的調度隊列
//
// activeQ(就緒隊列): 存儲等待被調度的Pod(從該隊列中Pop出的Pod), 默認存儲新添加的Pod
// backoffQ(退避隊列): 存儲等待特定時間后可調度的Pod(再次放入activeQ), 等待時間根據嘗試次數進行指數級增長(默認上限10s)
// unschedulablePods(不可調度隊列): 存儲由各種原因導致無法調度的Pod, 經過特定周期后會再次加入activeQ(默認60s)
type PriorityQueue struct {*nominatorstop chan struct{}clock clock.Clock// podInitialBackoffDuration Pod的初始退避時間, 默認1s// Pod后續每次調度失敗, 該時間就以二次方增加podInitialBackoffDuration time.Duration// podMaxBackoffDuration Pod的最大退避時間, 默認10spodMaxBackoffDuration time.Duration// podMaxInUnschedulablePodsDuration Pod可處于unschedulablePods的最大時間podMaxInUnschedulablePodsDuration time.Durationcond sync.Cond// inFlightPods 返回所有當前正在處理的PodinFlightPods map[types.UID]inFlightPod// receivedEvents 返回調度隊列收到的所有事件receivedEvents *list.List// activeQ 存儲待調度的Pod// 頭部Pod是具有最高優先級的Pod(最先調度)activeQ *heap.Heap// podBackoffQ 按照退避時間到期排序// 完成退避的Pod將在調度器查看activeQ之前從其中彈出podBackoffQ *heap.Heap// unschedulablePods 返回已嘗試并確定無法調度的PodunschedulablePods *UnschedulablePods// schedulingCycle 返回調度周期schedulingCycle int64// moveRequestCycle 緩存移動請求時的調度周期// 當接受到移動請求并正在調度不可調度Pod時, 則將其放回activeQmoveRequestCycle int64// preEnqueuePluginMap PreEnqueue插件的配置(K為配置文件名)preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin// queueingHintMap Queueing Hint插件的配置(K為配置文件名)queueingHintMap QueueingHintMapPerProfile// closed 隊列是否關閉closed boolnsLister listersv1.NamespaceListermetricsRecorder metrics.MetricAsyncRecorderpluginMetricsSamplePercent intisSchedulingQueueHintEnabled bool
}// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L1261// UnschedulablePods 不可調度Pod的隊列, 對Map的再次封裝
type UnschedulablePods struct {// podInfoMap 存儲Pod的map, K為Pod的名稱podInfoMap map[string]*framework.QueuedPodInfo// keyFunc 獲取對象K的函數keyFunc func(*v1.Pod) string// unschedulableRecorder 監控數據unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}
如: 優先隊列的流程
底層數據
底層數據: 封裝用于調度隊列存儲對象的底層數據結構
- Heap: 對map的再次封裝, 具有slice的順序性和map的高效檢索能力
- QueuedPodInfo: 對Pod的再次封裝, 具有調度隊列存儲相關的信息
// https://github1s.com/kubernetes/kubernetes/blob/HEAD/pkg/scheduler/framework/types.go#167// QueuedPodInfo 在Pod基礎上封裝關于調度隊列的信息
type QueuedPodInfo struct {*PodInfo// Timestamp Pod添加到調度隊列的時間// Pod可能會頻繁從取出再放入, 該時間便于處理PodTimestamp time.Time// Attempts Pod重試調度的次數Attempts int// InitialAttemptTimestamp Pod首次添加到調度隊列的時間// 初始化后不再更新, 用于計算調度完成所需時間InitialAttemptTimestamp *time.Time// UnschedulablePlugins Pod調度周期中導致失敗的插件名稱// 僅對PreFilter, Filter, Reserve, Permit(WaitOnPermit)插件有效UnschedulablePlugins sets.Set[string]// Gated 是否由 PreEnqueuePlugin 調度Gated bool
}
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/heap/heap.go#L127// Heap 實現堆數據結構的生產者/消費者隊列, 可用于優先級隊列類似的數據結構
type Heap struct {// data 存儲數據對象data *data// metricRecorder 監控數據metricRecorder metrics.MetricRecorder
}// data 實現標準庫的 Heap 接口
type data struct {// items 通過map管理所有對象items map[string]*heapItem// queue 通過slice管理所有對象的K(namespace + pod name)queue []string// keyFunc 獲取對象K的函數// 用于操作queue, 應保證該函數的確定性keyFunc KeyFunc// lessFunc 比較兩個對象的函數(用于排序)lessFunc lessFunc
}
調度緩存
調度緩存(SchedulerCache): 獲取Etcd中Pod和Node的綁定等調度相關所需的信息
- Node信息中已包含所有運行在該Node上的Pod信息
- 調度緩存會維護段時間已刪除的Node, 直到Node沒有Pod
- 調度緩存中的Node有虛實之分, 虛Node實現Node增加/刪除時的正常調度(
nodeTree
僅存儲實Node)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/cache.go#L57
// 實現Cache接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/interface.go#L60// cacheImpl 實現調度緩存接口
type cacheImpl struct {// top 調度緩存的停止Chanstop <-chan struct{}// ttl 假定調度綁定的超時時間, 默認30sttl time.Duration// period 定期清除假定調度綁定超時的Pod, 默認60speriod time.Duration// mu 讀寫鎖保證并發安全mu sync.RWMutex// assumedPods 假定調度Pod集合assumedPods sets.Set[string]// podStates 所有Pod信息podStates map[string]*podState// nodes 所有Node信息nodes map[string]*nodeInfoListItem// headNode Node雙向鏈表中首個Node// 基于特定規則排序, 鏈表的排序效率高于sliceheadNode *nodeInfoListItem// nodeTree 節點按照zone組成成的樹狀數據結構nodeTree *nodeTree// imagesStates 鏡像狀態imageStates map[string]*imageState
}
快照(Snapshot): 調度緩存某瞬間下的副本
- 作用:通過增量更新和只讀, 避免頻繁獲取和讀寫鎖損失性能
- 調度器在執行每個調度周期前, 都會獲取個快照作為調度的數據依據
- 每次調度都會根據調度緩存更新快照中的Node信息以保證狀態一致(僅更新部分信息)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/snapshot.go#L29// Snapshot 緩存 NodeInfo 和 NodeTree 快照
type Snapshot struct {// nodeInfoMap Node信息(map[namespace + name]NodeInfo)nodeInfoMap map[string]*framework.NodeInfo// nodeInfoList 按照NodeTree排序的Node全集列表(不包含已刪除的Node)nodeInfoList []*framework.NodeInfo// havePodsWithAffinityNodeInfoList 處理具有親和性的PodhavePodsWithAffinityNodeInfoList []*framework.NodeInfo// havePodsWithRequiredAntiAffinityNodeInfoList 處理具有反親和性的PodhavePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo// usedPVCSet 調度Pod使用的PVCusedPVCSet sets.Set[string]// generation Node的配置紀元// 所有NodeInfo.Generation中的最大值(其均源于全局Generation變量)generation int64
}
假定調度Pod(Assume): 調度結果寫入Etcd
- 異步綁定假定結果, 調度器繼續調度其他Pod以保證性能
- 假定調度綁定時會預先占用資源防止再次分配, 但真正綁定失敗會釋放占用資源
- 假定調度Pod具有綁定限定時間, 超時未真正綁定會釋放占用資源和清除假定調度Pod
- 當真正綁定時會刪除假定調度Pod, 并對假定調度Pod占用資源進行轉移
如: 調度緩存流程
調度框架
調度框架: Kubernetes調度器的插件架構(調度插件的集合)
- 擴展點: 調度插件注冊后執行位置(提供信息或調度決策)
- Pod的調度流程分為兩個周期: 調度周期(串行)、綁定周期(并行)
- 句柄(Handler): 為插件提供服務(提供額外功能, 協助插件完成功能)
- 配置后的調度框架就是個調度器, 配置后的調度插件就是個調度算法
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/runtime/framework.go#L49
// 實現Framework接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/interface.go#L512// frameworkImpl
type frameworkImpl struct {// registry 調度插件注冊表, 通過其創建配置的插件registry Registry// snapshotSharedLister 基于快照的ListersnapshotSharedLister framework.SharedLister// waitingPods 存儲等待批準的PodwaitingPods *waitingPodsMap// scorePluginWeight 插件的權重映射(K為插件名稱)scorePluginWeight map[string]int// 所有擴展點的插件, 用于實現Framework接口的各個方法// 每個擴展點都會遍歷執行插件, 且均在構造函數中通過SchedulingProfile生成preEnqueuePlugins []framework.PreEnqueuePluginenqueueExtensions []framework.EnqueueExtensionsqueueSortPlugins []framework.QueueSortPluginpreFilterPlugins []framework.PreFilterPluginfilterPlugins []framework.FilterPluginpostFilterPlugins []framework.PostFilterPluginpreScorePlugins []framework.PreScorePluginscorePlugins []framework.ScorePluginreservePlugins []framework.ReservePluginpreBindPlugins []framework.PreBindPluginbindPlugins []framework.BindPluginpostBindPlugins []framework.PostBindPluginpermitPlugins []framework.PermitPluginclientSet clientset.InterfacekubeConfig *restclient.ConfigeventRecorder events.EventRecorderinformerFactory informers.SharedInformerFactorylogger klog.LoggermetricsRecorder *metrics.MetricAsyncRecorderprofileName stringpercentageOfNodesToScore *int32extenders []framework.Extenderframework.PodNominatorparallelizer parallelize.Parallelizer
}
如: 調度周期和綁定周期執行流程(調度上下文)
- Kubernetes集群中可存在多個調度框架
- 擴展點是插件的設計接口, 調度需12個插件
- 帶有Pre前綴的插件都是提供信息的, 其他均是做決策的
- 調度框架就是個調度器, 配置好的調度插件就是調度算法
調度插件: 影響Pod調度的各組件
- 調度插件分為多種, 而每個調度插件可有多種實現
- 調度插件都虛靜態編譯到注冊中, 且通過唯一性的名稱區分
- 插件均是無狀態的(插件存儲狀態需依賴外部實現), 且插件間通信依賴于
CycleState
- 每種插件類型可實現多種類型的插件接口(該插件可在插件框架中的多個擴展位置作用)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/cycle_state.go#L48// CycleState 基于共享變量實現插件之間數據傳輸
// 僅作為單詞調度周期中, 各個插件之間通信(調度上下文)
//
// 未提供任何數據保護, 對所有插件都認為是可信的
type CycleState struct {// storage 存儲數據storage sync.Map// recordPluginMetrics 是否監控recordPluginMetrics bool// SkipFilterPlugins 將在Filter擴展點忽略的插件SkipFilterPlugins sets.Set[string]// SkipScorePlugins 將在Score擴展點忽略的插件SkipScorePlugins sets.Set[string]
}
調度框架中各調度插件的說明:
插件名稱 | 說明 |
---|---|
Sort | 排序等待調度的Pod (默認按照優先級) |
PreFilter | 處理Pod相關信息為過濾Node做準備 (過濾前的處理) |
Filter | 過濾無法運行該Pod的Node (對多節點并發應用多個Filter插件) |
PostFilter | 搶占調度 (僅在Filter過濾不出Node時執行) |
PreScore | 處理Pod相關信息為Node評分做準備 (主要處理親和性、拓撲分步和容忍度) |
Score | 對所有過濾的Node評分并排序 (首個Node則為Pod的最優選擇) |
NormalizeScore | 修改已排序的Node評分 (提高Node評分的擴展性) |
Reserve | 維護全局調度狀態 (防止下次調度與本次綁定完成前發生競爭) |
Premit | 標注Pod狀態以防止或延遲Pod綁定 (Pod狀態可為: 批準、等待、延遲) |
Prebind | 處理Pod綁定需完成的操作 (常用于完成PV和PVC) |
Bind | Pod與Node綁定 (僅作用單個Bind插件) |
PostBind | 清理綁定期間使用的資源 (沒有默認實現, 用于自定義擴展) |
- Pre前綴的插件是預處理并提供信息, 同時避免部分真正重量操作重復執行