乾貨好文|一文洞悉kubernetes資源排程機制

語言: CN / TW / HK

kube-scheduler 在k8s叢集中負責pod的排程。他主要的職責是監聽pod 資源,對沒有繫結node 的pod,根據特定的排程演算法與策略選擇叢集中最優的work node去執行這個pod。

本文基於kube-scheduler v1.21.1版本,對kube-scheduler的執行機制結合程式碼做一個簡單的解讀。

基本原理

kube-scheduler在設計上並不複雜,為pod獲取最優節點主要分為2個階段3個步驟

排程階段

  • predicates,為帶排程的pod過濾叢集中不適合執行的節點。kube-scheduler提供了一些過濾策略,多個策略可以組合使用。
  • priority,經過predicate過濾後, priority負責對剩餘的節點「評分」,比如剩餘資源較多的節點會獲得較高的評分,符合Pod NodeAffinityPriority的節點會獲得較高的評分。 priority階段只會返回一個節點,當最高分的節點有多個時將按照round-robin的方式選擇Node, kube-scheduler 內建了一些「評分策略」,同樣也可以組合使用。

繫結階段

  • bind,為pod繫結提名節點,傳送請求到apiserver。

擴充套件排程策略

當kube-scheduler預設載入的過濾策略與優先順序演算法不滿足我們的需求時,kube-scheduler也提供了介面讓我們「擴充套件」排程策略。

scheduler policy

  • 通過kube-scheduler的配置檔案,選擇使用哪些過濾策略和評分演算法,只允許使用k8s 已實現好的過濾策略和評分策略
  • 在policy配置中定義scheduler extender,通過webhook來擴充套件排程策略,開發者提供兩個介面分別用來處理"過濾"和“評分”。 一個介面用來處理kube-scheduler已經過濾後的節點列表。 一個介面用來處理kube-scheduler已經評分後的節點列表。

scheduler framework

  • pod的排程流程以外掛的方式實現,原有的過濾策略和評分策略全部都以in tree plugin的方式實現。對過濾、評分策略的擴充套件以out of tree plugin方式實現。 整個排程流程提供豐富的擴充套件點,每個擴充套件點繫結plugin,在排程流程走到相應的擴充套件點時,按順序執行該擴充套件點的plugin。開發者可以向kube-scheduler註冊外掛,以介入pod的排程繫結流程。
  • kube-scheduler v1.15版提出,v1.18版release,後續擴充套件排程策略的主流方案。

Scheduler Framework

排程系統的在排程時的目的往往是動態的,可能是成本優先、質量優先、最大資源利用率優先等等,這與業務場景有關。

正是因為排程系統的排程策略是與業務場景相關聯的,很難用一套排程策略滿足所有業務場景。越來越多的排程策略加入到kube-scheduler中,使得kube-scheduler的排程邏輯越來越複雜,複雜的排程器是難以維護的,早期的kube-scheduler雖然也具備了擴充套件能力使得開發者可以為特定的業務場景設計排程策略,但受到如下方面的限制。

  • 擴充套件點的數量只有兩個:過濾後、評分後。擴充套件程式只能在kube-scheduler過濾後與評分後介入排程流程。
  • kube-scheduler與擴充套件程式的程式使用HTTP通訊,每一次通訊都設計json的序列化與反序列化,效能較差。
  • 擴充套件程式作為一個獨立的程式,要麼只處理kube-scheduler傳遞過來的資料,要麼自行構建一套k8s資源快取,存在額外的資源開銷
  • 擴充套件程式無法感知被排程的資源當前處於什麼狀態。如果一個pod被kube-scheduler判定為不可排程,擴充套件程式是無法感知的。

因為這些限制無法構建一個高效能、多功能的排程器,為此社群提出scheduler framework來解決kube-scheduler的擴充套件與效能問題。使得排程程式的核心更簡單,方便維護。 

接下來我們來聊一聊scheduler framework定義的一些 「物件」分別表示什麼,承擔什麼作用。

ExtensionPoints(擴充套件點)

擴充套件點表示在排程繫結週期中的一個“階段”,kube-scheduler會在每一個階段執行做一些事情,以完成pod的排程繫結。Scheduler Frameworker的工作流按照以下順序,執行相應擴充套件點的外掛。每一個擴充套件點可以繫結多個外掛,一個擴充套件點的每一個外掛都需要返回處理結果,如果處理結果為錯誤,則該pod將直接打回「待排程佇列」等待下一次排程。

Predicates

從待排程的pod佇列中,拿出一個pod開始實現排程繫結邏輯時,將依次通過預定義的「擴充套件點」,擴充套件點對於開發者來說是一個介面,實現這個介面的物件被稱之為「外掛」。

一個外掛可以實現多個擴充套件點介面。

多個外掛實現同一個擴充套件點介面,Schduler會按序執行每一個外掛。

  • 在走到post filter擴充套件點時,按順序執行相應的plugin,如果有一個外掛返回成功或失敗,那麼其他的plugin都不會執行。
  • 在走到bind階段時,按順序執行相應的plugin,如果有一個plugin執行了繫結,那麼其他plugin都不會執行

CycleState

每一個pod的排程繫結流程,都會關聯一個CycleState物件,它用於儲存當前pod整個排程繫結流程的所有資料,在每一個擴充套件點中,所有的plugin都可以拿到這個物件,可以從該物件讀取或寫入一些必要的資訊。

CycleState是對所有plugin共享的。

擴充套件點詳解

在每一個擴充套件點,外掛都可以介入進來執行一些“操作”,下面對每一個擴充套件點做一個簡單的說明。

  • queueSort 待排程佇列的重排序,plugin可以在這個階段把優先順序比較高的pod放到佇列的前面,使得他們可以優先排程。
  • PreFilter ,過濾之前的預處理,檢查pod,可以將pod標記為不可排程。這個擴充套件點framework是不會提供node資訊的,只能對pod做檢查。雖然可以通過ClientSet去拿所有的node資訊,但不推薦,在大規模k8s環境下,會影響效率。
  • Filter  ,檢查node是否可以執行待排程的pod。外掛在這個擴充套件點拿到的是node資訊,不是node列表。外掛僅需檢查framework提供的node是否可以運行當前的pod
  • PostFilter,在filter擴充套件點,無法找到一個合適的node,被判定為無法排程時,就會觸發該擴充套件點外掛執行。內建的外掛通過這個擴充套件點,實現了高優先順序pod資源搶佔機制。
  • PreScore,前置評分主要用於對pod和候選node列表做一個檢查,並將處理結果放到CycleState中,以方便在Score擴充套件為評分邏輯提供一些輔助資訊。
  • Score, 給filter後產生的可用節點列表打分,返回可用node列表及其評分。
  • NormalizeScore,他是Score擴充套件點介面定義的一個方法,在評分完成後,還可以修正每個node分數。它必須要保證評分的範圍在0-100以內。
  • framework在拿到評分結果後會選擇評分最高的節點返回。
  • Reserve,實現該擴充套件點的外掛,可為pod預留關聯資源,如pod關聯了pvc資源,那麼在kube-scheduler的快取中提前為pod繫結pvc與pv資源。
  • Reserve擴充套件點需要實現reserve、unreserve方法,分別是執行資源預留與回滾資源預留。
  • kube-scheduler的VolumeBinding外掛在這個擴充套件點實現了pod關聯的pvc、pv資源預留。
  • Permit,可以阻止或延遲pod繫結node
  • PreBind,繫結前執行一些任務,如將pod的關聯資源的繫結持久化至k8s。如果PreBind失敗會執行Reserve擴充套件點的Unreserve方法回滾pod的關聯資源繫結。
  • Bind, 將pod和node繫結,實現bind擴充套件點的外掛按順序呼叫,只要有一個外掛完成繫結,後續外掛都會全部跳過。Bind失敗會執行Reserve擴充套件點的Unreserve方法回滾pod的關聯資源繫結。
  • PostBind, 繫結後執行一些邏輯,可以做一些清理工作。

任意一個擴充套件點外掛返回錯誤都會中斷該pod的排程,並返回到排程佇列

Multi Scheduler

Scheduler Framework的擴充套件能力遠遠超過之前Scheduler Policy,同時基於Scheduler Framework引入了新的Scheduler配置:KubeSchedulerProfile。

在KubeSchedulerProfile中可以定義每一個擴充套件點使用哪些外掛,禁用哪些外掛。

在KubeSchedulerProfile中可以定義多個Profile。一個Profile表示一個排程器,在kube-scheduler初始化時會讀取KubeSchedulerProfile建立多個framework物件,kube scheduler 通過pod.spec.schedulerName,找到對應的framework物件,使用該物件為pod執行排程繫結流程

如下定義了兩個排程器:default-scheduler、no-scoring-scheduler。

apiVersion: kubescheduler.config.k8s.io/v1beta2kind: KubeSchedulerConfigurationprofiles:  - schedulerName: default-scheduler  - schedulerName: no-scoring-scheduler    plugins:      preScore:        disabled:        - name: '*'      score:        disabled:        - name: '*'

基於Scheduler Framework的排程繫結實現

排程佇列的設計

kube-scheduler的pod排程佇列由PriorityQueue物件實現。他最核心的資料結構主要是3個子佇列

activeQ

activeQ子佇列包含正在等待排程的 pod,由「堆」資料結構實現。queueSort擴充套件點的外掛可以對該佇列中的pod 做排序,以實現高優先順序的pod優先排程

kube-scheduler內建的queueSort擴充套件點外掛 queueSortPlugin會將activeQ佇列中的pod,按照優先順序與建立時間排序。將高優先順序、建立時間早的pod放在佇列頭部。

同時activeQ中的pod  被拿出來時,會關聯一個SchedulingCycle。他是排程佇列裡面的一個計數器,每次pod被拿出來就加1。

podBackoffQ

backoff 是併發程式設計中常見的一種機制,就是如果一個任務重複執行,但依舊失敗,則會按照失敗的次數提高重試等待時間,避免頻繁重試浪費資源。

執行失敗的pod都會放到backoff佇列中,並在一段時間後移至activeQ中,這個”一段時間“具體是多久,則取決於它的失敗次數,最大不會超過10s(DefaultPodMaxBackoffDuration)

當你觀察k8s叢集裡一個 一直crash的pod,他的status會變成crashLoopBackoff,像這種pod都會進入podBackoffQ佇列

unschedulableQ

在排程時,被判定為無法排程的pod,都將存放至該佇列。 無法排程是指pod的要求,當前叢集無法滿足,如pod的cpu要求高,當前叢集所有節點都不滿足。此時pod就會被判定為無法排程進入unschedulableQ佇列。

排程佇列的使用

排程佇列在執行期間:

每隔1s檢查podBackOffQ佇列中是否有pod可以放進activeQ

每隔30s把unschedulableQ中長時間(預設60s)處於不可排程的pod移至backoff佇列。

每隔0s,scheduler物件從activeQ獲取待排程的pod

當一個pod在排程週期被判定為不可排程進入到unschedulerQ佇列後,如果叢集資源發生了變化,比如新增了node,刪除了pod等。一個不可排程的pod就有了排程成功的可能性。它將等待60s後,定時任務觸發將這個pod移動至backoffQ佇列,並再次從backoffQ佇列移動至activeQ佇列,準備下一次排程。

這樣子對於pod的排程會是一個效率的很低的事情,因為他需要的時間太長了。為了提高效率,scheduler物件監聽pod與node資源發生變化時,都會呼叫PriorityQueue物件的movePodsToActiveOrBackoffQueue方法。該方法會將不可排程的pod會被重新放進activeQ或者backoffQ,同時moveRequestCycle會設定為當前的schedulingCycle。

至於具體移動到哪個佇列,則根據moveRequestCycle是否大於等於SchedulingCycle,如果大於等於則放到backOff佇列,否則放到unschedulerQ佇列。結合前面的分析,只有資源發生變化時,moveRequestCycle才有可能大於等於當前的SchedulingCycle。那麼就說明,在判斷pod無法排程後,k8s叢集環境已經發生了變化。那麼此時判定無法調的pod在叢集變化後,還是有可能可以排程的,所以放到backOff佇列中,是為了讓他儘快發起排程重試。

最終工作流程如下:

編輯

排程主流程

  • scheduler物件從排程佇列的activeQ佇列取待排程的pod
  • scheduler物件根據pod的schedule Name 欄位獲取對應的framework物件。
  • scheduler物件呼叫generic scheduler物件執行framework的 queue sort、prefilter、filter、prescore、score擴充套件點。
  • 若generic scheduler在執行時,返回錯誤
  • 錯誤型別為FitError,表示過濾階段失敗,觸發搶佔機制,執行framework的postfilter擴充套件點
  • 將pod重新加入排程佇列
  • scheduler 物件執行assume,對拿到提名節點的pod在快取中執行節點繫結操作,這樣繫結流程可以非同步去執行。 同時,在kube-scheduler的pod快取中,這個pod是已經在node上正常執行的,那麼在後續pod排程時,對於節點資源的評估,也是包涵這個pod已經佔用節點上的一部分資源的。
  • scheduler物件執行framework的Reserve、Permit擴充套件點。
  • goroutine非同步執行framework的Prebind、Bind、PostBind擴充套件點。

排程週期是同步執行的,完成排程週期的工作流後,會通過goroutine非同步執行繫結週期,這樣可以無需等待繫結結果,立刻為下一個pod開啟排程。

大規模K8s叢集排程瓶頸

當k8s 叢集節點規模比較大時,如果每一個pod都需要遍歷所有node來判定哪個node是”合適“的。那麼這個排程流程效率會變的特別低。

kube-scheduler會控制參與排程的node數量來提高排程效率,在預設情況下,如果k8s節點的數量少於100個,那麼所有的節點都會參與排程。否則,將根據設定的節點百分比選擇部分節點參與排程。

控制節點數量

  • 呼叫prefilter 擴充套件點的外掛,檢查pod是否可以被排程,如果任一PreFilter外掛返回錯誤,那麼pod將打回待排程佇列
  • 遍歷所有節點,並呼叫Filter擴充套件點的外掛,並記錄適合該pod節點的數量,一旦達到數量限制或plugin的filter方法返回失敗,將停止遍歷。
  • 呼叫filter擴充套件點外掛時,可能會執行兩遍filter,具體的原因和搶佔功能有關,文章末尾解釋原因
  • 在尋找適合pod的node列表時,將開啟16個(預設16個)goroutine 並行篩選。每個goroutine會各自負責所有節點中的一部分。
  • 記錄停止遍歷時,node列表遍歷到什麼位置。在下一個pod的排程週期,將從這個位置開始遍歷node列表。這樣可以保障叢集中每一個節點的都有公平的被排程機會。

程式碼分析如下:

計算節點數量限制

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {  //  對於節點數量少於100的,全部節點參與排程  //  percentageOfNodesToScore是叢集所有節點參與數量的百分比,如果設定為100,就是所有節點都參與排程  if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {    return numAllNodes  }    adaptivePercentage := g.percentageOfNodesToScore//當numAllNodes大於100時,且配置的百分比小於等於0,那麼這裡需要計算出一個百分比// 計算公式:百分比 = 50 - (總節點數)/125  if adaptivePercentage <= 0 {    basePercentageOfNodesToScore := int32(50)    adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125    if adaptivePercentage < minFeasibleNodesPercentageToFind {      adaptivePercentage = minFeasibleNodesPercentageToFind    }  }  numNodes = numAllNodes * adaptivePercentage / 100  if numNodes < minFeasibleNodesToFind {    return minFeasibleNodesToFind  }  return numNodes}

獲取可排程節點

// findNodesThatPassFilters finds the nodes that fit the filter plugins.func (g *genericScheduler) findNodesThatPassFilters(  ctx context.Context,  fwk framework.Framework,  state *framework.CycleState,  pod *v1.Pod,  diagnosis framework.Diagnosis,  nodes []*framework.NodeInfo) ([]*v1.Node, error) {  // 計算node數量限制  numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))  // 存放合適的node列表  feasibleNodes := make([]*v1.Node, numNodesToFind)  // 如果沒有外掛實現Filter擴充套件點,就直接擷取所有node列表中的一段  // 從上一次停止查詢的node 後面開始截  if !fwk.HasFilterPlugins() {    length := len(nodes)    for i := range feasibleNodes {      feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()    }    g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length    return feasibleNodes, nil  }  errCh := parallelize.NewErrorChannel()  var statusesLock sync.Mutex  var feasibleNodesLen int32  ctx, cancel := context.WithCancel(ctx)  // 執行所有外掛的Filter,  checkNode := func(i int) {    // 從上一個排程週期中離開的節點開始檢查節點是否合適,執行所有外掛的filter    nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]    status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)    if status.Code() == framework.Error {      errCh.SendErrorWithCancel(status.AsError(), cancel)      return    }    // 如果這個節點合適,那麼就把他放到feasibleNodes列表中    if status.IsSuccess() {            length := atomic.AddInt32(&feasibleNodesLen, 1)      if length > numNodesToFind {        cancel()        atomic.AddInt32(&feasibleNodesLen, -1)      } else {        feasibleNodes[length-1] = nodeInfo.Node()      }    } else {      statusesLock.Lock()      diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status      diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())      statusesLock.Unlock()    }  }  beginCheckNode := time.Now()  statusCode := framework.Success  defer func() {    // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins    // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.    // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.    metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))  }()  // 開協程執行filter,直到數量達到限制  fwk.Parallelizer().Until(ctx, len(nodes), checkNode)  // 設定下次開始遍歷node的位置  processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)  g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)  feasibleNodes = feasibleNodes[:feasibleNodesLen]  if err := errCh.ReceiveError(); err != nil {    statusCode = framework.Error    return nil, err  }  return feasibleNodes, nil}

搶走低優先順序pod的資源

kube-scheduler為了保障高優先順序的pod可以優先排程,在pod被判定為無法排程時,並不會直接將其放到unschedulerQ佇列,而是檢查有沒有優先順序比當前pod低的的pod可以搶佔。如果有則執行搶佔流程。

pod的priority用來表示pod的優先順序,如果沒有設定這個欄位,那麼pod的優先順序就是0。k8s控制平面的所有元件全部都是高優先順序,其優先順序都被設定為2000001000。

”搶佔“的邏輯由PostFilter擴充套件點來實現,後面我們主要分析kube-scheduler內建的DefaultPreemption外掛實現PostFilter時,是如何實現搶佔邏輯的。

func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {  defer func() {    metrics.PreemptionAttempts.Inc()  }()  nnn, status := pl.preempt(ctx, state, pod, m)  if !status.IsSuccess() {    return nil, status  }  // This happens when the pod is not eligible for preemption or extenders filtered all candidates.  if nnn == "" {    return nil, framework.NewStatus(framework.Unschedulable)  }  return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)}

DefaultPreemption外掛首先會去獲取node列表,然後獲取最新的要執行搶佔的pod資訊,接著分下面幾步執行搶佔:

  1. 檢查是pod否可實施搶佔。呼叫PodEligibleToPreemptOthers方法,檢查當前pod是否能夠進行搶佔,如果當前的pod已經搶佔了一個node節點且該節點有pod正在執行優雅退出,那麼不應該執行搶佔。
  2. 查詢可搶佔的節點。呼叫FindCandidates找到所有node中能被搶佔的node節點,並返回候選node列表以及node節點中需要被刪除的pod(犧牲者);
  3. 尋找最佳搶佔目標。呼叫SelectCandidate方法在所有候選列表中找出最合適的node節點執行搶佔;
  4. 執行搶佔。呼叫PrepareCandidate方法刪除被搶佔的node節點中victim pod(犧牲者),以及清除犧牲者的NominatedNodeName欄位資訊;
  5. 犧牲者pod資源發生變化,被kube-sheculer監聽到,重新加入排程佇列,等待重新排程。

PodEligibleToPreemptOthers

這個方法會檢查該pod是否已經搶佔過其他node節點,如果是的話就遍歷這個節點上的所有pod物件,如果發現節點上有pod資源物件的優先順序小於待排程pod資源物件並處於terminating狀態,說明這個node正在執行低優先順序pod驅逐,已經有正在刪除的pod,等待刪除成功後,釋放資源,高優先順序的pod就會佔用這個node。

func PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool {  if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {    klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod))    return false  }  nomNodeName := pod.Status.NominatedNodeName  // 檢查pod是否已經有提名node,如果有那麼說明已經執行過搶佔  if len(nomNodeName) > 0 {    if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {      return true    }    // 獲取搶佔的node    if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {    // 檢查這個node中是否存在正處於terminating狀態的pod,且優先順序比當前pod低      podPriority := corev1helpers.PodPriority(pod)      for _, p := range nodeInfo.Pods {        if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority {          // There is a terminating pod on the nominated node.          return false        }      }    }  }  return true}

FindCandidates

FindCandidates方法首先會獲取node列表,然後呼叫nodesWherePreemptionMightHelp方法來找出predicates 階段失敗但是通過搶佔也許能夠排程成功的nodes,因為並不是所有的node都可以通過搶佔來排程成功。最後呼叫dryRunPreemption方法來獲取符合條件的node節點。

func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status) {  allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()  if err != nil {    return nil, nil, framework.AsStatus(err)  }  if len(allNodes) == 0 {    return nil, nil, framework.NewStatus(framework.Error, "no nodes available")  }  // 找到可以佔用的node  potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)  if len(potentialNodes) == 0 {    klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))    // In this case, we should clean-up any existing nominated node name of the pod.    // 如果當前pod不存在可以搶佔的node,那麼就把pod的提名node資訊給刪掉    if err := util.ClearNominatedNodeName(pl.fh.ClientSet(), pod); err != nil {      klog.ErrorS(err, "cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod))      // We do not return as this error is not critical.    }    return nil, unschedulableNodeStatus, nil  }  // 獲取PDB物件,PDB能夠限制同時終端的pod資源物件的數量,以保證叢集的高可用性  pdbs, err := getPodDisruptionBudgets(pl.pdbLister)  if err != nil {    return nil, nil, framework.AsStatus(err)  }  offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes)))  if klog.V(5).Enabled() {    var sample []string    for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ {      sample = append(sample, potentialNodes[i].Node().Name)    }    klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates)  }  // 尋找符合條件的node,並封裝成candidate陣列返回  candidates, nodeStatuses := dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates)  for node, status := range unschedulableNodeStatus {    nodeStatuses[node] = status  }  return candidates, nodeStatuses, nil}

SelectCandidate

這個方法裡面會呼叫candidatesToVictimsMap方法做一個node name和victims對映map,然後呼叫pickOneNodeForPreemption執行主要過濾邏輯。

func SelectCandidate(candidates []Candidate) Candidate {  // 如果沒有候選node,就直接返回nil  if len(candidates) == 0 {    return nil  }  // 如果只有一個候選node,就直接返回該node  if len(candidates) == 1 {    return candidates[0]  }  // 拿到所有候選node裡面,需要“犧牲”的pod 對映關係  victimsMap := candidatesToVictimsMap(candidates)  // 選擇一個候選node  candidateNode := pickOneNodeForPreemption(victimsMap)    // Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree  // preemption plugins that exercise different candidates on the same nominated node.  if victims := victimsMap[candidateNode]; victims != nil {    return &candidate{      victims: victims,      name:    candidateNode,    }  }  // We shouldn't reach here.  klog.ErrorS(errors.New("no candidate selected"), "should not reach here", "candidates", candidates)  // To not break the whole flow, return the first candidate.  return candidates[0]}

PrepareCandidate

至此拿到了候選node,以及該node上需要犧牲的pod。

  • 刪除需要犧牲的pod。
  • 找到所有提名這個node的 pod,且優先順序比當前pod低的。
  • 去除這些pod的Nominated資訊。並且將這些pod移至activeQ佇列,讓他們重新排程

func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) *framework.Status {  // 刪除候選node上 需要犧牲的pod  for _, victim := range c.Victims().Pods {    // If the victim is a WaitingPod, send a reject message to the PermitPlugin.    // Otherwise we should delete the victim.    if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {      waitingPod.Reject(pluginName, "preempted")    } else if err := util.DeletePod(cs, victim); err != nil {      klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))      return framework.AsStatus(err)    }    fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",      pod.Namespace, pod.Name, c.Name())  }  metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))  // 找到優先順序比當前pod低,且也提名了該候選node的pod。  nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())  // 刪除這些pod的Nominate資訊,並移至activeQ佇列  if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {    klog.ErrorS(err, "cannot clear 'NominatedNodeName' field")    // We do not return as this error is not critical.  }  return nil}

搶佔總結

高優先順序的pod進行”搶佔“時,會將pod的nominatedNodeName 欄位,設定為被搶佔的 Node 的名字。然後,在下一週期中決定是不是要執行在被搶佔的節點上,當這個Pod在等待的時候,如果有其他更高優先順序的 Pod 也要搶佔同一個節點,那麼排程器就會清空「被搶佔者」的spec.nominatedNodeName 欄位,從而允許更高優先順序的搶佔者執行搶佔。這也使得「被搶佔者」本身也有機會去重新搶佔其他節點。

搶佔者並不會立刻被排程到被搶佔的 node 上,排程器只會將搶佔者的 status.nominatedNodeName 欄位設定為被搶佔的 node 的名字。然後,搶佔者會重新進入下一個排程週期,在新的排程週期裡來決定是不是要執行在被搶佔的節點上,當然,即使在下一個排程週期,排程器也不會保證搶佔者一定會執行在被搶佔的節點上。這樣設計的一個重要原因是排程器只會通過標準的 DELETE API 來刪除被搶佔的 pod,所以,這些 pod 必然是有一定的“優雅退出”時間(預設是 30s)的。而在這段時間裡,其他的節點也是有可能變成可排程的,或者直接有新的節點被新增到這個叢集中來。

排程流程的二次過濾

在排程流程中Filter擴充套件點可能會執行兩次,其主要目的是為了考慮高優先順序的pod搶佔了node的場景。

第一次會呼叫addNominatedPods方法將排程佇列中找到node上優先順序大於或等於當前pod的 pod集合加入到nodeInfo物件中,然後執行FilterPlugin列表。第二次則直接執行FilterPlugins列表。之所以第一次要這麼做,是因為在pod搶佔node的邏輯中,優先順序高的pod先搶佔node,搶佔成功後將pod.Status.NominatedNodeName欄位設定成當前的node,設定完成後scheduler就跑去執行下一個pod的排程邏輯了,這時pod很可能還沒有真正在node上面跑起來。所以Scheduler的快取中其實並沒有將這類pod的資訊,所以在排程當前pod的時候,會受這些高優先順序pod的影響(pod和pod之間有pod親和性、反親和性等依賴關係),所以要先假設這類高優先順序的pod已經在這個node中跑起來了,已經佔用了節點上的一部分資源,這樣當前pod的排程是以「節點最少剩下多少資源」來執行filter擴充套件點。

為了確保萬無一失(萬一這些高優先順序的pod最終沒在這個node跑起來),還得把這些高優先的pod排除掉再執行一次filter擴充套件點。 這樣,無論其它高優先順序的pod在不在這個node上,這個pod都能確保無衝突地排程在這些node上面。

程式碼實現如下:

func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {  var status *framework.Status  podsAdded := false  for i := 0; i < 2; i++ {    stateToUse := state    nodeInfoToUse := info    if i == 0 {      var err error      podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)      if err != nil {        return framework.AsStatus(err)      }    } else if !podsAdded || !status.IsSuccess() {      break    }    statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)    status = statusMap.Merge()    if !status.IsSuccess() && !status.IsUnschedulable() {      return status    }  }  return status}

結束語

得益於scheduler framework的設計,所有的排程策略都以外掛的形式注入kube-scheduler,並協同工作完成pod的排程繫結。排程策略的實現分散在各個外掛中,本文主要關注kube-scheduler一些重要外掛的排程邏輯實現,起到拋磚引玉的作用。

引用

https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/624-scheduling-framework/README.md framework設計提案

https://draveness.me/system-design-scheduler/ 排程系統設計精要

https://zhuanlan.zhihu.com/p/33823266 叢集資源排程系統設計架構

https://www.infoq.cn/article/lYUw79lJH9bZv7HrgGH5 進擊的kubernetes排程系統