原始碼解析:K8s 建立 pod 時,背後發生了什麼(四)(2021)

語言: CN / TW / HK

本文基於 2019 年的一篇文章 What happens when … Kubernetes edition! 梳理了 k8s 建立 pod (及其 deployment/replicaset) 的整個過程 , 整理了每個 重要步驟的程式碼呼叫棧 ,以 在實現層面加深對整個過程的理解

原文參考的 k8S 程式碼已經較老( v1.8 / v1.14 以及當時的 master ),且部分程式碼 連結已失效; 本文程式碼基於 v1.21

由於內容已經不與原文一一對應(有增加和刪減),因此標題未加 “[譯]” 等字樣。感謝原作者(們)的精彩文章。

篇幅太長,分成了幾部分:

  1. 原始碼解析:K8s 建立 pod 時,背後發生了什麼(一)(2021)
  2. 原始碼解析:K8s 建立 pod 時,背後發生了什麼(二)(2021)
  3. 原始碼解析:K8s 建立 pod 時,背後發生了什麼(三)(2021)
  4. 原始碼解析:K8s 建立 pod 時,背後發生了什麼(四)(2021)
  5. 原始碼解析:K8s 建立 pod 時,背後發生了什麼(五)(2021)
  • 5 Control loops(控制迴圈)
    • 5.1 Deployments controller
    • 5.2 ReplicaSets controller
      • ReplicaSets controller 啟動
      • 建立 ReplicaSet:回撥函式處理
    • 5.4 Scheduler(排程器)

5 Control loops(控制迴圈)

至此,物件已經在 etcd 中了,所有的初始化步驟也已經完成了。 下一步是設定資源拓撲(resource topology)。例如,一個 Deployment 其實就是一組 ReplicaSet,而一個 ReplicaSet 就是一組 Pod。 K8s 是如何根據一個 HTTP 請求創建出這個層級關係的呢?靠的是 K8s 內建的控制器 (controllers)。

K8s 中大量使用 “controllers”,

  • 一個 controller 就是一個 非同步指令碼 (an asynchronous script),
  • 不斷檢查資源的 當前狀態 (current state)和 期望狀態 (desired state)是否一致,
  • 如果不一致就嘗試將其變成期望狀態,這個過程稱為 reconcile

每個 controller 負責的東西都比較少, 所有 controller 並行執行, 由 kube-controller-manager 統一管理

5.1 Deployments controller

Deployments controller 啟動

當一個 Deployment record 儲存到 etcd 並(被 initializers)初始化之後, kube-apiserver 就會將其置為對外可見的。此後, Deployment controller 監聽了 Deployment 資源的變動,因此此時就會檢測到這個新建立的資源。

// pkg/controller/deployment/deployment_controller.go

// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer DeploymentInformer, rsInformer ReplicaSetInformer,
    podInformer PodInformer, client clientset.Interface) (*DeploymentController, error) {

    dc := &DeploymentController{
        client:        client,
        queue:         workqueue.NewNamedRateLimitingQueue(),
    }
    dc.rsControl = controller.RealRSControl{ // ReplicaSet controller
        KubeClient: client,
        Recorder:   dc.eventRecorder,
    }

    // 註冊 Deployment 事件回撥函式
    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,    // 有 Deployment 建立時觸發
        UpdateFunc: dc.updateDeployment,
        DeleteFunc: dc.deleteDeployment,
    })
    // 註冊 ReplicaSet 事件回撥函式
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
        DeleteFunc: dc.deleteReplicaSet,
    })
    // 註冊 Pod 事件回撥函式
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: dc.deletePod,
    })

    dc.syncHandler = dc.syncDeployment
    dc.enqueueDeployment = dc.enqueue

    return dc, nil
}

建立 Deployment:回撥函式處理

在本文場景中,觸發的是 controller 註冊的 addDeployment() 回撥函式 其所做的工作就是將 deployment 物件放到一個內部佇列:

// pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) addDeployment(obj interface{}) {
    d := obj.(*apps.Deployment)
    dc.enqueueDeployment(d)
}

主處理迴圈

worker 不斷遍歷這個 queue,從中 dequeue item 並進行處理:

// pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) worker() {
    for dc.processNextWorkItem() {
    }
}

func (dc *DeploymentController) processNextWorkItem() bool {
    key, quit := dc.queue.Get()
    dc.syncHandler(key.(string)) // dc.syncHandler = dc.syncDeployment
}

// syncDeployment will sync the deployment with the given key.
func (dc *DeploymentController) syncDeployment(key string) error {
    namespace, name := cache.SplitMetaNamespaceKey(key)

    deployment := dc.dLister.Deployments(namespace).Get(name)
    d := deployment.DeepCopy()

    // 獲取這個 Deployment 的所有 ReplicaSets, while reconciling ControllerRef through adoption/orphaning.
    rsList := dc.getReplicaSetsForDeployment(d)

    // 獲取這個 Deployment 的所有 pods, grouped by their ReplicaSet
    podMap := dc.getPodMapForDeployment(d, rsList)

    if d.DeletionTimestamp != nil { // 這個 Deployment 已經被標記,等待被刪除
        return dc.syncStatusOnly(d, rsList)
    }

    dc.checkPausedConditions(d)
    if d.Spec.Paused { // pause 狀態
        return dc.sync(d, rsList)
    }

    if getRollbackTo(d) != nil {
        return dc.rollback(d, rsList)
    }

    scalingEvent := dc.isScalingEvent(d, rsList)
    if scalingEvent {
        return dc.sync(d, rsList)
    }

    switch d.Spec.Strategy.Type {
    case RecreateDeploymentStrategyType:             // re-create
        return dc.rolloutRecreate(d, rsList, podMap)
    case RollingUpdateDeploymentStrategyType:        // rolling-update
        return dc.rolloutRolling(d, rsList)
    }
    return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

controller 會通過 label selector 從 kube-apiserver 查詢 與這個 deployment 關聯的 ReplicaSet 或 Pod records(然後發現沒有)。

如果發現當前狀態與預期狀態不一致,就會觸發同步過程((synchronization process))。 這個同步過程是無狀態的,也就是說,它並不區分是新記錄還是老記錄,一視同仁。

執行擴容(scale up)

如上,發現 pod 不存在之後,它會開始擴容過程(scaling process):

// pkg/controller/deployment/sync.go

// scale up/down 或新建立(pause)時都會執行到這裡
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {

    newRS, oldRSs := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    dc.scale(d, newRS, oldRSs)

    // Clean up the deployment when it's paused and no rollback is in flight.
    if d.Spec.Paused && getRollbackTo(d) == nil {
        dc.cleanupDeployment(oldRSs, d)
    }

    allRSs := append(oldRSs, newRS)
    return dc.syncDeploymentStatus(allRSs, newRS, d)
}

大致步驟:

  1. Rolling out (例如 creating)一個 ReplicaSet resource
  2. 分配一個 label selector
  3. 初始版本好(revision number)置為 1

ReplicaSet 的 PodSpec,以及其他一些 metadata 是從 Deployment 的 manifest 拷過來的。

最後會更新 deployment 狀態,然後重新進入 reconciliation 迴圈,直到 deployment 進入預期的狀態。

小結

由於 Deployment controller 只負責 ReplicaSet 的建立 ,因此下一步 (ReplicaSet -> Pod)要由 reconciliation 過程中的另一個 controller —— ReplicaSet controller 來完成。

5.2 ReplicaSets controller

上一步周,Deployments controller 已經建立了 Deployment 的第一個 ReplicaSet,但此時還沒有任何 Pod。 下面就輪到 ReplicaSet controller 出場了。 它的任務是監控 ReplicaSet 及其依賴資源(pods)的生命週期,實現方式也是註冊事件回撥函式。

ReplicaSets controller 啟動

// pkg/controller/replicaset/replica_set.go

func NewReplicaSetController(rsInformer ReplicaSetInformer, podInformer PodInformer,
    kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {

    return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
        apps.SchemeGroupVersion.WithKind("ReplicaSet"),
        "replicaset_controller",
        "replicaset",
        controller.RealPodControl{
            KubeClient: kubeClient,
        },
    )
}

// 抽象出 NewBaseController() 是為了程式碼複用,例如 NewReplicationController() 也會呼叫這個函式。
func NewBaseController(rsInformer, podInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk GroupVersionKind, metricOwnerName, queueName, podControl PodControlInterface) *ReplicaSetController {

    rsc := &ReplicaSetController{
        kubeClient:       kubeClient,
        podControl:       podControl,
        burstReplicas:    burstReplicas,
        expectations:     controller.NewUIDTrackingControllerExpectations(NewControllerExpectations()),
        queue:            workqueue.NewNamedRateLimitingQueue()
    }

    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.addRS,
        UpdateFunc: rsc.updateRS,
        DeleteFunc: rsc.deleteRS,
    })
    rsc.rsLister = rsInformer.Lister()

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
    rsc.podLister = podInformer.Lister()

    rsc.syncHandler = rsc.syncReplicaSet
    return rsc
}

建立 ReplicaSet:回撥函式處理

主處理迴圈

當一個 ReplicaSet 被(Deployment controller)建立之後,

// pkg/controller/replicaset/replica_set.go

// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {

    namespace, name := cache.SplitMetaNamespaceKey(key)
    rs := rsc.rsLister.ReplicaSets(namespace).Get(name)

    selector := metav1.LabelSelectorAsSelector(rs.Spec.Selector)

    // 包括那些不匹配 rs selector,但有 stale controller ref 的 pod
    allPods := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    filteredPods := controller.FilterActivePods(allPods) // Ignore inactive pods.
    filteredPods = rsc.claimPods(rs, selector, filteredPods)

    if rsNeedsSync && rs.DeletionTimestamp == nil { // 需要同步,並且沒有被標記待刪除
        rsc.manageReplicas(filteredPods, rs)        // *主處理邏輯*
    }

    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
    updatedRS := updateReplicaSetStatus(AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
}

RS controller 檢查 ReplicaSet 的狀態, 發現當前狀態和期望狀態之間有偏差(skew),因此接下來呼叫 manageReplicas() 來 reconcile 這個狀態,在這裡做的事情就是增加這個 ReplicaSet 的 pod 數量。

// pkg/controller/replicaset/replica_set.go

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey := controller.KeyFunc(rs)

    if diff < 0 {
        diff *= -1
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }

        rsc.expectations.ExpectCreations(rsKey, diff)
        successfulCreations := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() {
            return rsc.podControl.CreatePodsWithControllerRef( // 擴容
                // 呼叫棧 CreatePodsWithControllerRef -> createPod() -> Client.CoreV1().Pods().Create()
                rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
        })

        // The skipped pods will be retried later. The next controller resync will retry the slow start process.
        if skippedPods := diff - successfulCreations; skippedPods > 0 {
            for i := 0; i < skippedPods; i++ {
                // Decrement the expected number of creates because the informer won't observe this pod
                rsc.expectations.CreationObserved(rsKey)
            }
        }
        return err
    } else if diff > 0 {
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }

        relatedPods := rsc.getIndirectlyRelatedPods(rs)
        podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs) // 縮容
            }(pod)
        }
    }

    return nil
}

增加 pod 數量的操作比較小心,每次最多不超過 burst count(這個配置是從 ReplicaSet 的父物件 Deployment 那裡繼承來的)。

另外,建立 Pods 的過程是 批處理的 , “慢啟動”操,開始時是 SlowStartInitialBatchSize ,每執行成功一批,下次的 batch size 就翻倍。 這樣設計是為了避免給 kube-apiserver 造成不必要的壓力,例如,如果由於 quota 不足,這批 pod 大部分都會失敗,那 這種方式只會有一小批請求到達 kube-apiserver,而如果一把全上的話,請求全部會打過去。 同樣是失敗,這種失敗方式比較優雅。

Owner reference

K8s 通過 Owner Reference (子資源中的一個欄位,指向的是其父資源的 ID) 維護物件層級 (hierarchy)。這可以帶來兩方面好處:

  1. 實現了 cascading deletion,即父物件被 GC 時會確保 GC 子物件;
  2. 父物件之間不會出現競爭子物件的情況(例如,兩個父物件認為某個子物件都是自己的)

另一個隱藏的好處是:Owner Reference 是有狀態的:如果 controller 重啟,重啟期間不會影響 系統的其他部分,因為資源拓撲(resource topology)是獨立於 controller 的。 這種隔離設計也體現在 controller 自己的設計中: controller 不應該操作 其他 controller 的資源 (resources they don’t explicitly own)。

有時也可能會出現“孤兒”資源(”orphaned” resources)的情況,例如

  1. 父資源刪除了,子資源還在;
  2. GC 策略導致子資源無法被刪除。

這種情況發生時, controller 會確保孤兒資源會被某個新的父資源收養 。 多個父資源都可以競爭成為孤兒資源的父資源,但只有一個會成功(其餘的會收到一個 validation 錯誤)。

5.3 Informers

很多 controller(例如 RBAC authorizer 或 Deployment controller)需要將叢集資訊拉到本地。

例如 RBAC authorizer 中,authenticator 會將使用者資訊儲存到請求上下文中。隨後, RBAC authorizer 會用這個資訊獲取 etcd 中所有與這個使用者相關的 role 和 role bindings。

那麼,controller 是如何訪問和修改這些資源的?在 K8s 中,這是通過 informer 機制實現的。

informer 是一種 controller 訂閱儲存(etcd)事件的機制 ,能方便地獲取它們感興趣的資源。

  • 這種方式除了提供一種很好的抽象之外,還負責處理快取(caching,非常重要,因為可 以減少 kube-apiserver 連線數,降低 controller 測和 kube-apiserver 側的序列化 成本)問題。
  • 此外,這種設計還使得 controller 的行為是 threadsafe 的,避免影響其他元件或服務。

關於 informer 和 controller 的聯合工作機制,可參考 這篇部落格

5.4 Scheduler(排程器)

以上 controllers 執行完各自的處理之後,etcd 中已經有了一個 Deployment、一個 ReplicaSet 和三個 Pods,可以通過 kube-apiserver 查詢到。 但此時, 這三個 pod 還卡在 Pending 狀態,因為它們還沒有被排程到任何節點 另外一個 controller —— 排程器 —— 負責做這件事情。

scheduler 作為控制平面的一個獨立服務執行,但 工作方式與其他 controller 是一樣的 : 監聽事件,然後嘗試 reconcile 狀態。

呼叫棧概覽

Run // pkg/scheduler/scheduler.go 
  |-SchedulingQueue.Run()
  |
  |-scheduleOne()
     |-bind
     |  |-RunBindPlugins
     |     |-runBindPlugins
     |        |-Bind
     |-sched.Algorithm.Schedule(pod)
        |-findNodesThatFitPod
        |-prioritizeNodes
        |-selectHost

排程過程

// pkg/scheduler/core/generic_scheduler.go

// 將 pod 排程到指定 node list 中的某臺 node 上
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework,
    state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {

    feasibleNodes, diagnosis := g.findNodesThatFitPod(ctx, fwk, state, pod) // 過濾可用 nodes
    if len(feasibleNodes) == 0 {
        return result, &framework.FitError{}
    }

    if len(feasibleNodes) == 1 { // 可用 node 只有一個,就選它了
        return ScheduleResult{SuggestedHost:  feasibleNodes[0].Name}, nil
    }

    priorityList := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
    host := g.selectHost(priorityList)

    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
        FeasibleNodes:  len(feasibleNodes),
    }, err
}

// Filters nodes that fit the pod based on the framework filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework,
    state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {

    diagnosis := framework.Diagnosis{
        NodeToStatusMap:      make(framework.NodeToStatusMap),
        UnschedulablePlugins: sets.NewString(),
    }

    // Run "prefilter" plugins.
    s := fwk.RunPreFilterPlugins(ctx, state, pod)
    allNodes := g.nodeInfoSnapshot.NodeInfos().List()

    if len(pod.Status.NominatedNodeName) > 0 && featureGate.Enabled(features.PreferNominatedNode) {
        feasibleNodes := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
        if len(feasibleNodes) != 0 {
            return feasibleNodes, diagnosis, nil
        }
    }

    feasibleNodes := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
    feasibleNodes = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
    return feasibleNodes, diagnosis, nil
}

它會過濾 過濾 PodSpect 中 NodeName 欄位為空的 pods ,嘗試為這樣的 pods 挑選一個 node 排程上去。

排程演算法

下面簡單看下內建的預設排程演算法。

註冊預設 predicates

這些 predicates 其實都是函式,被呼叫到時,執行相應的 過濾 。 例如, 如果 PodSpec 裡面顯式要求了 CPU 或 RAM 資源,而一個 node 無法滿足這些條件 , 那就會將這個 node 從備選列表中刪除。

// pkg/scheduler/algorithmprovider/registry.go

// NewRegistry returns an algorithm provider registry instance.
func NewRegistry() Registry {
    defaultConfig := getDefaultConfig()
    applyFeatureGates(defaultConfig)

    caConfig := getClusterAutoscalerConfig()
    applyFeatureGates(caConfig)

    return Registry{
        schedulerapi.SchedulerDefaultProviderName: defaultConfig,
        ClusterAutoscalerProvider:                 caConfig,
    }
}

func getDefaultConfig() *schedulerapi.Plugins {
    plugins := &schedulerapi.Plugins{
        PreFilter: schedulerapi.PluginSet{...},
        Filter: schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: nodename.Name},        // 指定 node name 排程
                {Name: tainttoleration.Name}, // 指定 toleration 排程
                {Name: nodeaffinity.Name},    // 指定 node affinity 排程
                ...
            },
        },
        PostFilter: schedulerapi.PluginSet{...},
        PreScore: schedulerapi.PluginSet{...},
        Score: schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: interpodaffinity.Name, Weight: 1},
                {Name: nodeaffinity.Name, Weight: 1},
                {Name: tainttoleration.Name, Weight: 1},
                ...
            },
        },
        Reserve: schedulerapi.PluginSet{...},
        PreBind: schedulerapi.PluginSet{...},
        Bind: schedulerapi.PluginSet{...},
    }

    return plugins
}

plugin 的實現見 pkg/scheduler/framework/plugins/ ,以 nodename filter 為例:

// pkg/scheduler/framework/plugins/nodename/node_name.go

// Filter invoked at the filter extension point.
func (pl *NodeName) Filter(ctx context.Context, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    if !Fits(pod, nodeInfo) {
        return framework.NewStatus(UnschedulableAndUnresolvable, ErrReason)
    }
    return nil
}

// 如果 pod 沒有指定 NodeName,或者指定的 NodeName 等於該 node 的 name,返回 true;其他返回 false
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
    return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name
}

對篩選出的 node 排序

選擇了合適的 nodes 之後,接下來會執行一系列 priority function 對這些 nodes 進行排序 。 例如,如果演算法是希望將 pods 儘量分散到整個叢集,那 priority 會選擇資源儘量空閒的節點。

這些函式會給每個 node 打分, 得分最高的 node 會被選中 ,排程到該節點。

// pkg/scheduler/core/generic_scheduler.go

// 執行打分外掛(score plugins)對 nodes 進行排序。
func (g *genericScheduler) prioritizeNodes(ctx context.Context, fwk framework.Framework,
    state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node,) (framework.NodeScoreList, error) {

    // 如果沒有指定 priority 配置,所有 node 將都得 1 分。
    if len(g.extenders) == 0 && !fwk.HasScorePlugins() {
        result := make(framework.NodeScoreList, 0, len(nodes))
        for i := range nodes {
            result = append(result, framework.NodeScore{ Name:  nodes[i].Name, Score: 1 })
        }
        return result, nil
    }

    preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)       // PreScoe 外掛
    scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)  // Score 外掛

    result := make(framework.NodeScoreList, 0, len(nodes))
    for i := range nodes {
        result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
        for j := range scoresMap {
            result[i].Score += scoresMap[j][i].Score
        }
    }

    if len(g.extenders) != 0 && nodes != nil {
        combinedScores := make(map[string]int64, len(nodes))
        for i := range g.extenders {
            if !g.extenders[i].IsInterested(pod) {
                continue
            }
            go func(extIndex int) {
                prioritizedList, weight := g.extenders[extIndex].Prioritize(pod, nodes)
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    combinedScores[host] += score * weight
                }
            }(i)
        }

        for i := range result {
            result[i].Score += combinedScores[result[i].Name] * (MaxNodeScore / MaxExtenderPriority)
        }
    }

    return result, nil
}

建立 v1.Binding 物件

演算法選出一個 node 之後,排程器會 建立一個 Binding 物件 , Pod 的 ObjectReference 欄位的值就是選中的 node 的名字

// pkg/scheduler/framework/runtime/framework.go

func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState,
    pod *v1.Pod, nodeName string) *framework.Status {

    if !state.ShouldRecordPluginMetrics() {
        return bp.Bind(ctx, state, pod, nodeName)
    }

    status := bp.Bind(ctx, state, pod, nodeName)
    return status
}
// pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

// Bind binds pods to nodes using the k8s client.
func (b DefaultBinder) Bind(ctx, state *CycleState, p *v1.Pod, nodeName string) *framework.Status {
    binding := &v1.Binding{
        ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
        Target:     v1.ObjectReference{Kind: "Node", Name: nodeName}, // ObjectReference 欄位為 nodeName
    }

    b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
}

如上,最後 ClientSet().CoreV1().Pods(binding.Namespace).Bind() 通過一個 POST 請求發給 apiserver

kube-apiserver 更新 pod 物件

kube-apiserver 收到這個 Binding object 請求後,registry 反序列化物件,更新 Pod 物件的下列欄位:

  • 設定 NodeName
  • 新增 annotations
  • 設定 PodScheduled status 為 True
// pkg/registry/core/pod/storage/storage.go

func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podID, oldMachine, machine string,
    annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {

    podKey := r.store.KeyFunc(ctx, podID)
    r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, nil,
        storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {

        pod, ok := obj.(*api.Pod)
        pod.Spec.NodeName = machine
        if pod.Annotations == nil {
            pod.Annotations = make(map[string]string)
        }
        for k, v := range annotations {
            pod.Annotations[k] = v
        }
        podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{
            Type:   api.PodScheduled,
            Status: api.ConditionTrue,
        })

        return pod, nil
    }), dryRun, nil)
}

自定義排程器

predicate 和 priority function 都是可擴充套件的,可以通過 --policy-config-file 指定。

K8s 還可以自定義排程器(自己實現排程邏輯)。 如果 PodSpec 中 schedulerName 欄位不為空 ,K8s 就會 將這個 pod 的排程權交給指定的排程器。

5.5 小結

總結一下前面已經完成的步驟:

  1. HTTP 請求通過了認證、鑑權、admission control
  2. Deployment, ReplicaSet 和 Pod resources 已經持久化到 etcd
  3. 一系列 initializers 已經執行完畢,
  4. 每個 Pod 也已經排程到了合適的 node 上。

但是, 到目前為止,我們看到的所有東西(狀態),還只是存在於 etcd 中的元資料 。 下一步就是將這些狀態同步到計算節點上,然後計算節點上的 agent(kubelet)就開始幹活了。