原始碼解析:K8s 建立 pod 時,背後發生了什麼(四)(2021)
本文基於 2019 年的一篇文章 What happens when … Kubernetes edition! 梳理了 k8s 建立 pod (及其 deployment/replicaset) 的整個過程 , 整理了每個 重要步驟的程式碼呼叫棧 ,以 在實現層面加深對整個過程的理解 。
原文參考的 k8S 程式碼已經較老( v1.8
/ v1.14
以及當時的 master
),且部分程式碼
連結已失效;
本文程式碼基於
v1.21
。
由於內容已經不與原文一一對應(有增加和刪減),因此標題未加 “[譯]” 等字樣。感謝原作者(們)的精彩文章。
篇幅太長,分成了幾部分:
- 原始碼解析:K8s 建立 pod 時,背後發生了什麼(一)(2021)
- 原始碼解析:K8s 建立 pod 時,背後發生了什麼(二)(2021)
- 原始碼解析:K8s 建立 pod 時,背後發生了什麼(三)(2021)
- 原始碼解析:K8s 建立 pod 時,背後發生了什麼(四)(2021)
- 原始碼解析:K8s 建立 pod 時,背後發生了什麼(五)(2021)
-
5 Control loops(控制迴圈)
- 5.1 Deployments controller
-
5.2 ReplicaSets controller
- ReplicaSets controller 啟動
- 建立 ReplicaSet:回撥函式處理
- 5.4 Scheduler(排程器)
5 Control loops(控制迴圈)
至此,物件已經在 etcd 中了,所有的初始化步驟也已經完成了。 下一步是設定資源拓撲(resource topology)。例如,一個 Deployment 其實就是一組 ReplicaSet,而一個 ReplicaSet 就是一組 Pod。 K8s 是如何根據一個 HTTP 請求創建出這個層級關係的呢?靠的是 K8s 內建的控制器 (controllers)。
K8s 中大量使用 “controllers”,
- 一個 controller 就是一個 非同步指令碼 (an asynchronous script),
- 不斷檢查資源的 當前狀態 (current state)和 期望狀態 (desired state)是否一致,
- 如果不一致就嘗試將其變成期望狀態,這個過程稱為 reconcile 。
每個 controller 負責的東西都比較少, 所有 controller 並行執行, 由 kube-controller-manager 統一管理 。
5.1 Deployments controller
Deployments controller 啟動
當一個 Deployment record 儲存到 etcd 並(被 initializers)初始化之後, kube-apiserver 就會將其置為對外可見的。此後, Deployment controller 監聽了 Deployment 資源的變動,因此此時就會檢測到這個新建立的資源。
// pkg/controller/deployment/deployment_controller.go // NewDeploymentController creates a new DeploymentController. func NewDeploymentController(dInformer DeploymentInformer, rsInformer ReplicaSetInformer, podInformer PodInformer, client clientset.Interface) (*DeploymentController, error) { dc := &DeploymentController{ client: client, queue: workqueue.NewNamedRateLimitingQueue(), } dc.rsControl = controller.RealRSControl{ // ReplicaSet controller KubeClient: client, Recorder: dc.eventRecorder, } // 註冊 Deployment 事件回撥函式 dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, // 有 Deployment 建立時觸發 UpdateFunc: dc.updateDeployment, DeleteFunc: dc.deleteDeployment, }) // 註冊 ReplicaSet 事件回撥函式 rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) // 註冊 Pod 事件回撥函式 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, }) dc.syncHandler = dc.syncDeployment dc.enqueueDeployment = dc.enqueue return dc, nil }
建立 Deployment:回撥函式處理
在本文場景中,觸發的是 controller 註冊的 addDeployment() 回撥函式 其所做的工作就是將 deployment 物件放到一個內部佇列:
// pkg/controller/deployment/deployment_controller.go func (dc *DeploymentController) addDeployment(obj interface{}) { d := obj.(*apps.Deployment) dc.enqueueDeployment(d) }
主處理迴圈
worker 不斷遍歷這個 queue,從中 dequeue item 並進行處理:
// pkg/controller/deployment/deployment_controller.go func (dc *DeploymentController) worker() { for dc.processNextWorkItem() { } } func (dc *DeploymentController) processNextWorkItem() bool { key, quit := dc.queue.Get() dc.syncHandler(key.(string)) // dc.syncHandler = dc.syncDeployment } // syncDeployment will sync the deployment with the given key. func (dc *DeploymentController) syncDeployment(key string) error { namespace, name := cache.SplitMetaNamespaceKey(key) deployment := dc.dLister.Deployments(namespace).Get(name) d := deployment.DeepCopy() // 獲取這個 Deployment 的所有 ReplicaSets, while reconciling ControllerRef through adoption/orphaning. rsList := dc.getReplicaSetsForDeployment(d) // 獲取這個 Deployment 的所有 pods, grouped by their ReplicaSet podMap := dc.getPodMapForDeployment(d, rsList) if d.DeletionTimestamp != nil { // 這個 Deployment 已經被標記,等待被刪除 return dc.syncStatusOnly(d, rsList) } dc.checkPausedConditions(d) if d.Spec.Paused { // pause 狀態 return dc.sync(d, rsList) } if getRollbackTo(d) != nil { return dc.rollback(d, rsList) } scalingEvent := dc.isScalingEvent(d, rsList) if scalingEvent { return dc.sync(d, rsList) } switch d.Spec.Strategy.Type { case RecreateDeploymentStrategyType: // re-create return dc.rolloutRecreate(d, rsList, podMap) case RollingUpdateDeploymentStrategyType: // rolling-update return dc.rolloutRolling(d, rsList) } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) }
controller 會通過 label selector 從 kube-apiserver 查詢 與這個 deployment 關聯的 ReplicaSet 或 Pod records(然後發現沒有)。
如果發現當前狀態與預期狀態不一致,就會觸發同步過程((synchronization process))。 這個同步過程是無狀態的,也就是說,它並不區分是新記錄還是老記錄,一視同仁。
執行擴容(scale up)
如上,發現 pod 不存在之後,它會開始擴容過程(scaling process):
// pkg/controller/deployment/sync.go // scale up/down 或新建立(pause)時都會執行到這裡 func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) dc.scale(d, newRS, oldRSs) // Clean up the deployment when it's paused and no rollback is in flight. if d.Spec.Paused && getRollbackTo(d) == nil { dc.cleanupDeployment(oldRSs, d) } allRSs := append(oldRSs, newRS) return dc.syncDeploymentStatus(allRSs, newRS, d) }
大致步驟:
- Rolling out (例如 creating)一個 ReplicaSet resource
- 分配一個 label selector
- 初始版本好(revision number)置為 1
ReplicaSet 的 PodSpec,以及其他一些 metadata 是從 Deployment 的 manifest 拷過來的。
最後會更新 deployment 狀態,然後重新進入 reconciliation 迴圈,直到 deployment 進入預期的狀態。
小結
由於 Deployment controller 只負責 ReplicaSet 的建立 ,因此下一步 (ReplicaSet -> Pod)要由 reconciliation 過程中的另一個 controller —— ReplicaSet controller 來完成。
5.2 ReplicaSets controller
上一步周,Deployments controller 已經建立了 Deployment 的第一個 ReplicaSet,但此時還沒有任何 Pod。 下面就輪到 ReplicaSet controller 出場了。 它的任務是監控 ReplicaSet 及其依賴資源(pods)的生命週期,實現方式也是註冊事件回撥函式。
ReplicaSets controller 啟動
// pkg/controller/replicaset/replica_set.go func NewReplicaSetController(rsInformer ReplicaSetInformer, podInformer PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController { return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas, apps.SchemeGroupVersion.WithKind("ReplicaSet"), "replicaset_controller", "replicaset", controller.RealPodControl{ KubeClient: kubeClient, }, ) } // 抽象出 NewBaseController() 是為了程式碼複用,例如 NewReplicationController() 也會呼叫這個函式。 func NewBaseController(rsInformer, podInformer, kubeClient clientset.Interface, burstReplicas int, gvk GroupVersionKind, metricOwnerName, queueName, podControl PodControlInterface) *ReplicaSetController { rsc := &ReplicaSetController{ kubeClient: kubeClient, podControl: podControl, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(NewControllerExpectations()), queue: workqueue.NewNamedRateLimitingQueue() } rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rsc.addRS, UpdateFunc: rsc.updateRS, DeleteFunc: rsc.deleteRS, }) rsc.rsLister = rsInformer.Lister() podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rsc.addPod, UpdateFunc: rsc.updatePod, DeleteFunc: rsc.deletePod, }) rsc.podLister = podInformer.Lister() rsc.syncHandler = rsc.syncReplicaSet return rsc }
建立 ReplicaSet:回撥函式處理
主處理迴圈
當一個 ReplicaSet 被(Deployment controller)建立之後,
// pkg/controller/replicaset/replica_set.go // syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled, // meaning it did not expect to see any more of its pods created or deleted. func (rsc *ReplicaSetController) syncReplicaSet(key string) error { namespace, name := cache.SplitMetaNamespaceKey(key) rs := rsc.rsLister.ReplicaSets(namespace).Get(name) selector := metav1.LabelSelectorAsSelector(rs.Spec.Selector) // 包括那些不匹配 rs selector,但有 stale controller ref 的 pod allPods := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) filteredPods := controller.FilterActivePods(allPods) // Ignore inactive pods. filteredPods = rsc.claimPods(rs, selector, filteredPods) if rsNeedsSync && rs.DeletionTimestamp == nil { // 需要同步,並且沒有被標記待刪除 rsc.manageReplicas(filteredPods, rs) // *主處理邏輯* } newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) updatedRS := updateReplicaSetStatus(AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) }
RS controller 檢查 ReplicaSet 的狀態,
發現當前狀態和期望狀態之間有偏差(skew),因此接下來呼叫 manageReplicas()
來 reconcile
這個狀態,在這裡做的事情就是增加這個 ReplicaSet 的 pod 數量。
// pkg/controller/replicaset/replica_set.go func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey := controller.KeyFunc(rs) if diff < 0 { diff *= -1 if diff > rsc.burstReplicas { diff = rsc.burstReplicas } rsc.expectations.ExpectCreations(rsKey, diff) successfulCreations := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() { return rsc.podControl.CreatePodsWithControllerRef( // 擴容 // 呼叫棧 CreatePodsWithControllerRef -> createPod() -> Client.CoreV1().Pods().Create() rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) }) // The skipped pods will be retried later. The next controller resync will retry the slow start process. if skippedPods := diff - successfulCreations; skippedPods > 0 { for i := 0; i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod rsc.expectations.CreationObserved(rsKey) } } return err } else if diff > 0 { if diff > rsc.burstReplicas { diff = rsc.burstReplicas } relatedPods := rsc.getIndirectlyRelatedPods(rs) podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff) rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete)) for _, pod := range podsToDelete { go func(targetPod *v1.Pod) { rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs) // 縮容 }(pod) } } return nil }
增加 pod 數量的操作比較小心,每次最多不超過 burst count(這個配置是從 ReplicaSet 的父物件 Deployment 那裡繼承來的)。
另外,建立 Pods 的過程是 批處理的
,
“慢啟動”操,開始時是 SlowStartInitialBatchSize
,每執行成功一批,下次的 batch size 就翻倍。
這樣設計是為了避免給 kube-apiserver 造成不必要的壓力,例如,如果由於 quota 不足,這批 pod 大部分都會失敗,那
這種方式只會有一小批請求到達 kube-apiserver,而如果一把全上的話,請求全部會打過去。
同樣是失敗,這種失敗方式比較優雅。
Owner reference
K8s 通過 Owner Reference (子資源中的一個欄位,指向的是其父資源的 ID) 維護物件層級 (hierarchy)。這可以帶來兩方面好處:
- 實現了 cascading deletion,即父物件被 GC 時會確保 GC 子物件;
- 父物件之間不會出現競爭子物件的情況(例如,兩個父物件認為某個子物件都是自己的)
另一個隱藏的好處是:Owner Reference 是有狀態的:如果 controller 重啟,重啟期間不會影響 系統的其他部分,因為資源拓撲(resource topology)是獨立於 controller 的。 這種隔離設計也體現在 controller 自己的設計中: controller 不應該操作 其他 controller 的資源 (resources they don’t explicitly own)。
有時也可能會出現“孤兒”資源(”orphaned” resources)的情況,例如
- 父資源刪除了,子資源還在;
- GC 策略導致子資源無法被刪除。
這種情況發生時, controller 會確保孤兒資源會被某個新的父資源收養 。 多個父資源都可以競爭成為孤兒資源的父資源,但只有一個會成功(其餘的會收到一個 validation 錯誤)。
5.3 Informers
很多 controller(例如 RBAC authorizer 或 Deployment controller)需要將叢集資訊拉到本地。
例如 RBAC authorizer 中,authenticator 會將使用者資訊儲存到請求上下文中。隨後, RBAC authorizer 會用這個資訊獲取 etcd 中所有與這個使用者相關的 role 和 role bindings。
那麼,controller 是如何訪問和修改這些資源的?在 K8s 中,這是通過 informer 機制實現的。
informer 是一種 controller 訂閱儲存(etcd)事件的機制 ,能方便地獲取它們感興趣的資源。
- 這種方式除了提供一種很好的抽象之外,還負責處理快取(caching,非常重要,因為可 以減少 kube-apiserver 連線數,降低 controller 測和 kube-apiserver 側的序列化 成本)問題。
- 此外,這種設計還使得 controller 的行為是 threadsafe 的,避免影響其他元件或服務。
關於 informer 和 controller 的聯合工作機制,可參考 這篇部落格 。
5.4 Scheduler(排程器)
以上 controllers 執行完各自的處理之後,etcd 中已經有了一個 Deployment、一個 ReplicaSet 和三個 Pods,可以通過 kube-apiserver 查詢到。 但此時, 這三個 pod 還卡在 Pending 狀態,因為它們還沒有被排程到任何節點 。 另外一個 controller —— 排程器 —— 負責做這件事情。
scheduler 作為控制平面的一個獨立服務執行,但 工作方式與其他 controller 是一樣的 : 監聽事件,然後嘗試 reconcile 狀態。
呼叫棧概覽
Run // pkg/scheduler/scheduler.go |-SchedulingQueue.Run() | |-scheduleOne() |-bind | |-RunBindPlugins | |-runBindPlugins | |-Bind |-sched.Algorithm.Schedule(pod) |-findNodesThatFitPod |-prioritizeNodes |-selectHost
排程過程
// pkg/scheduler/core/generic_scheduler.go // 將 pod 排程到指定 node list 中的某臺 node 上 func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { feasibleNodes, diagnosis := g.findNodesThatFitPod(ctx, fwk, state, pod) // 過濾可用 nodes if len(feasibleNodes) == 0 { return result, &framework.FitError{} } if len(feasibleNodes) == 1 { // 可用 node 只有一個,就選它了 return ScheduleResult{SuggestedHost: feasibleNodes[0].Name}, nil } priorityList := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes) host := g.selectHost(priorityList) return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap), FeasibleNodes: len(feasibleNodes), }, err } // Filters nodes that fit the pod based on the framework filter plugins and filter extenders. func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { diagnosis := framework.Diagnosis{ NodeToStatusMap: make(framework.NodeToStatusMap), UnschedulablePlugins: sets.NewString(), } // Run "prefilter" plugins. s := fwk.RunPreFilterPlugins(ctx, state, pod) allNodes := g.nodeInfoSnapshot.NodeInfos().List() if len(pod.Status.NominatedNodeName) > 0 && featureGate.Enabled(features.PreferNominatedNode) { feasibleNodes := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis) if len(feasibleNodes) != 0 { return feasibleNodes, diagnosis, nil } } feasibleNodes := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes) feasibleNodes = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap) return feasibleNodes, diagnosis, nil }
它會過濾 過濾 PodSpect 中 NodeName 欄位為空的 pods ,嘗試為這樣的 pods 挑選一個 node 排程上去。
排程演算法
下面簡單看下內建的預設排程演算法。
註冊預設 predicates
這些 predicates 其實都是函式,被呼叫到時,執行相應的 過濾 。 例如, 如果 PodSpec 裡面顯式要求了 CPU 或 RAM 資源,而一個 node 無法滿足這些條件 , 那就會將這個 node 從備選列表中刪除。
// pkg/scheduler/algorithmprovider/registry.go // NewRegistry returns an algorithm provider registry instance. func NewRegistry() Registry { defaultConfig := getDefaultConfig() applyFeatureGates(defaultConfig) caConfig := getClusterAutoscalerConfig() applyFeatureGates(caConfig) return Registry{ schedulerapi.SchedulerDefaultProviderName: defaultConfig, ClusterAutoscalerProvider: caConfig, } } func getDefaultConfig() *schedulerapi.Plugins { plugins := &schedulerapi.Plugins{ PreFilter: schedulerapi.PluginSet{...}, Filter: schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: nodename.Name}, // 指定 node name 排程 {Name: tainttoleration.Name}, // 指定 toleration 排程 {Name: nodeaffinity.Name}, // 指定 node affinity 排程 ... }, }, PostFilter: schedulerapi.PluginSet{...}, PreScore: schedulerapi.PluginSet{...}, Score: schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: interpodaffinity.Name, Weight: 1}, {Name: nodeaffinity.Name, Weight: 1}, {Name: tainttoleration.Name, Weight: 1}, ... }, }, Reserve: schedulerapi.PluginSet{...}, PreBind: schedulerapi.PluginSet{...}, Bind: schedulerapi.PluginSet{...}, } return plugins }
plugin 的實現見 pkg/scheduler/framework/plugins/
,以 nodename
filter 為例:
// pkg/scheduler/framework/plugins/nodename/node_name.go // Filter invoked at the filter extension point. func (pl *NodeName) Filter(ctx context.Context, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { if !Fits(pod, nodeInfo) { return framework.NewStatus(UnschedulableAndUnresolvable, ErrReason) } return nil } // 如果 pod 沒有指定 NodeName,或者指定的 NodeName 等於該 node 的 name,返回 true;其他返回 false func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool { return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name }
對篩選出的 node 排序
選擇了合適的 nodes 之後,接下來會執行一系列 priority function 對這些 nodes 進行排序 。 例如,如果演算法是希望將 pods 儘量分散到整個叢集,那 priority 會選擇資源儘量空閒的節點。
這些函式會給每個 node 打分, 得分最高的 node 會被選中 ,排程到該節點。
// pkg/scheduler/core/generic_scheduler.go // 執行打分外掛(score plugins)對 nodes 進行排序。 func (g *genericScheduler) prioritizeNodes(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node,) (framework.NodeScoreList, error) { // 如果沒有指定 priority 配置,所有 node 將都得 1 分。 if len(g.extenders) == 0 && !fwk.HasScorePlugins() { result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{ Name: nodes[i].Name, Score: 1 }) } return result, nil } preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes) // PreScoe 外掛 scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes) // Score 外掛 result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } if len(g.extenders) != 0 && nodes != nil { combinedScores := make(map[string]int64, len(nodes)) for i := range g.extenders { if !g.extenders[i].IsInterested(pod) { continue } go func(extIndex int) { prioritizedList, weight := g.extenders[extIndex].Prioritize(pod, nodes) for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score combinedScores[host] += score * weight } }(i) } for i := range result { result[i].Score += combinedScores[result[i].Name] * (MaxNodeScore / MaxExtenderPriority) } } return result, nil }
建立 v1.Binding
物件
演算法選出一個 node 之後,排程器會 建立一個 Binding 物件 , Pod 的 ObjectReference 欄位的值就是選中的 node 的名字 。
// pkg/scheduler/framework/runtime/framework.go func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *framework.Status { if !state.ShouldRecordPluginMetrics() { return bp.Bind(ctx, state, pod, nodeName) } status := bp.Bind(ctx, state, pod, nodeName) return status }
// pkg/scheduler/framework/plugins/defaultbinder/default_binder.go // Bind binds pods to nodes using the k8s client. func (b DefaultBinder) Bind(ctx, state *CycleState, p *v1.Pod, nodeName string) *framework.Status { binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, Target: v1.ObjectReference{Kind: "Node", Name: nodeName}, // ObjectReference 欄位為 nodeName } b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) }
如上,最後 ClientSet().CoreV1().Pods(binding.Namespace).Bind()
通過一個
POST 請求發給 apiserver
。
kube-apiserver 更新 pod 物件
kube-apiserver 收到這個 Binding object 請求後,registry 反序列化物件,更新 Pod 物件的下列欄位:
- 設定 NodeName
- 新增 annotations
-
設定
PodScheduled
status 為True
// pkg/registry/core/pod/storage/storage.go func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podID, oldMachine, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) { podKey := r.store.KeyFunc(ctx, podID) r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) pod.Spec.NodeName = machine if pod.Annotations == nil { pod.Annotations = make(map[string]string) } for k, v := range annotations { pod.Annotations[k] = v } podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{ Type: api.PodScheduled, Status: api.ConditionTrue, }) return pod, nil }), dryRun, nil) }
自定義排程器
predicate 和 priority function 都是可擴充套件的,可以通過 --policy-config-file
指定。
K8s 還可以自定義排程器(自己實現排程邏輯)。 如果 PodSpec 中 schedulerName 欄位不為空 ,K8s 就會 將這個 pod 的排程權交給指定的排程器。
5.5 小結
總結一下前面已經完成的步驟:
- HTTP 請求通過了認證、鑑權、admission control
- Deployment, ReplicaSet 和 Pod resources 已經持久化到 etcd
- 一系列 initializers 已經執行完畢,
- 每個 Pod 也已經排程到了合適的 node 上。
但是, 到目前為止,我們看到的所有東西(狀態),還只是存在於 etcd 中的元資料 。 下一步就是將這些狀態同步到計算節點上,然後計算節點上的 agent(kubelet)就開始幹活了。
- [譯] 為 K8s workload 引入的一些 BPF datapath 擴充套件(LPC, 2021)
- [譯] [論文] 可虛擬化第三代(計算機)架構的規範化條件(ACM, 1974)
- [譯] NAT 穿透是如何工作的:技術原理及企業級實踐(Tailscale, 2020)
- [譯] 寫給工程師:關於證書(certificate)和公鑰基礎設施(PKI)的一切(SmallStep, 2018)
- [譯] 基於角色的訪問控制(RBAC):演進歷史、設計理念及簡潔實現(Tailscale, 2021)
- [譯] Control Group v2(cgroupv2 權威指南)(KernelDoc, 2021)
- [譯] Linux Socket Filtering (LSF, aka BPF)(KernelDoc,2021)
- [譯] LLVM eBPF 彙編程式設計(2020)
- [譯] Cilium:BPF 和 XDP 參考指南(2021)
- BPF 進階筆記(三):BPF Map 核心實現
- BPF 進階筆記(二):BPF Map 型別詳解:使用場景、程式示例
- BPF 進階筆記(一):BPF 程式型別詳解:使用場景、函式簽名、執行位置及程式示例
- 原始碼解析:K8s 建立 pod 時,背後發生了什麼(四)(2021)
- 原始碼解析:K8s 建立 pod 時,背後發生了什麼(三)(2021)
- [譯] 邁向完全可程式設計 tc 分類器(NetdevConf,2016)
- [譯] 雲原生世界中的資料包標記(packet mark)(LPC, 2020)
- [譯] 利用 eBPF 支撐大規模 K8s Service (LPC, 2019)
- 計算規模驅動下的網路方案演進
- 邁入 Cilium BGP 的雲原生網路時代
- [譯] BeyondProd:雲原生安全的一種新方法(Google, 2019)