TiDB Operator 原始碼閱讀 (二) Operator 模式
在 上一篇文章 中我們討論了 TiDB Operator 的應用場景,瞭解了 TiDB Operator 可以在 Kubernetes 叢集中管理 TiDB 的生命週期。可是,TiDB Operator 的程式碼是怎樣執行起來的?TiDB 元件的生命週期管理的邏輯又是如何編排的呢?我們將 從 Operator 模式的視角,介紹 TiDB Operator 的程式碼實現,並主要討論 controller-manager 的實現,介紹從程式碼入口到元件的生命週期事件被觸發的中間過程。
Operator模式的演化
TiDB Operator 參考了 kube-controller-manager 的設計,瞭解 Kubernetes 的設計有助於瞭解 TiDB Operator 的程式碼邏輯。Kubernetes 內的 Resources 都是通過 Controller 實現生命週期管理的,例如 Namespace、Node、Deployment、Statefulset 等等,這些 Controller 的程式碼在 kube-controller-manager 中實現並由 kube-controller-manager 啟動後呼叫。
為了支援使用者自定義資源的開發需求,Kubernetes 社群基於上面的開發經驗,提出了 Operator 模式。 Operator 模式的實現過程 具體為:
1. 建立自定義資源的描述,即 Kubernetes 內的 CRD (CustomResourceDefinition);
2. 建立自定義資源,基於 CRD 建立 CR (CustomResource) 物件;
3. 實現自定義資源的 Controller,處理 CR 及其關聯資源的變更需求;
4. Controller 通過比對資源最新狀態和期望狀態,逐步完成運維操作,實現最終資源狀態與期望狀態一致。
通過定義 CRD 和實現對應 Controller,開發者無需將程式碼合併到 Kubernetes 中編譯使用,即可完成一個資源的生命週期管理的功能實現。
TiDB Operator 的 Controller Manager
TiDB Operator 使用 tidb-controller-manager 管理各個 CRD 的 Controller。從 cmd/controller-manager/main.go 開始,tidb-controller-manager 首先載入了 kubeconfig,用於連線 kube-apiserver,然後使用一系列 NewController 函式,載入了各個 Controller 的初始化函式。
controllers := []Controller{ tidbcluster.NewController(deps), dmcluster.NewController(deps), backup.NewController(deps), restore.NewController(deps), backupschedule.NewController(deps), tidbinitializer.NewController(deps), tidbmonitor.NewController(deps), }
在 Controller 的初始化函式過程中,會初始化一系列 Informer,這些 Informer 主要用來和 kube-apiserver 互動獲取 CRD 和相關資源的變更。以 TiDBCluster 為例,在初始化函式 NewController 中,會初始化 Informer 物件:
tidbClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueTidbCluster, UpdateFunc: func(old, cur interface{}) { c.enqueueTidbCluster(cur) }, DeleteFunc: c.enqueueTidbCluster, }) statefulsetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addStatefulSet, UpdateFunc: func(old, cur interface{}) { c.updateStatefulSet(old, cur) }, DeleteFunc: c.deleteStatefulSet, })
Informer 中添加了處理新增、更新、刪除事件的 EventHandler,把監聽到的事件涉及到的 CR 的 Key 加入佇列。
初始化完成後啟動 InformerFactory 並等待 Cache 同步完成。
informerFactories := []InformerFactory{ deps.InformerFactory, deps.KubeInformerFactory, deps.LabelFilterKubeInformerFactory, } for _, f := range informerFactories { f.Start(ctx.Done()) for v, synced := range f.WaitForCacheSync(wait.NeverStop) { if !synced { klog.Fatalf("error syncing informer for %v", v) } } }
隨後 tidb-controller-manager 會呼叫各個 Controller 的 Run 函式,開始迴圈執行 Controller 的內部邏輯。
// Start syncLoop for all controllers for _,controller := range controllers { c := controller go wait.Forever(func() { c.Run(cliCfg.Workers,ctx.Done()) },cliCfg.WaitDuration) }
以 TiDBCluster Controller 為例,Run 函式會啟動 worker 處理工作佇列。
// Run runs the tidbcluster controller. func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Info("Starting tidbcluster controller") defer klog.Info("Shutting down tidbcluster controller") for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } <-stopCh }
Worker 會呼叫 processNextWorkItem 函式,彈出佇列的元素,然後呼叫 Sync 函式進行同步:
// worker runs a worker goroutine that invokes processNextWorkItem until the the controller's queue is closed func (c *Controller) worker() { for c.processNextWorkItem() { } } // processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never // invoked concurrently with the same key. func (c *Controller) processNextWorkItem() bool { key, quit := c.queue.Get() if quit { return false } defer c.queue.Done(key) if err := c.sync(key.(string)); err != nil { if perrors.Find(err, controller.IsRequeueError) != nil { klog.Infof("TidbCluster: %v, still need sync: %v, requeuing", key.(string), err) } else { utilruntime.HandleError(fmt.Errorf("TidbCluster: %v, sync failed %v, requeuing", key.(string), err)) } c.queue.AddRateLimited(key) } else { c.queue.Forget(key) } return true }
Sync 函式會根據 Key 獲取對應的 CR 物件,例如這裡的 TiDBCluster 物件,然後對這個 TiDBCluster 物件進行同步。
// sync syncs the given tidbcluster. func (c *Controller) sync(key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing TidbCluster %q (%v)", key, time.Since(startTime)) }() ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } tc, err := c.deps.TiDBClusterLister.TidbClusters(ns).Get(name) if errors.IsNotFound(err) { klog.Infof("TidbCluster has been deleted %v", key) return nil } if err != nil { return err } return c.syncTidbCluster(tc.DeepCopy()) } func (c *Controller) syncTidbCluster(tc *v1alpha1.TidbCluster) error { return c.control.UpdateTidbCluster(tc) }
syncTidbCluster 函式呼叫 updateTidbCluster 函式,進而呼叫一系列元件的 Sync 函式實現 TiDB 叢集管理的相關工作。在 pkg/controller/tidbcluster/tidbclustercontrol.go 的 updateTidbCluster 函式實現中,我們可以看到各個元件的 Sync 函式在這裡呼叫,在相關呼叫程式碼註釋裡描述著每個 Sync 函式執行的生命週期操作事件,可以幫助理解每個元件的 Reconcile 需要完成哪些工作,例如 PD 元件:
// works that should do to making the pd cluster current state match the desired state: // - create or update the pd service // - create or update the pd headless service // - create the pd statefulset // - sync pd cluster status from pd to TidbCluster object // - upgrade the pd cluster // - scale out/in the pd cluster // - failover the pd cluster if err := c.pdMemberManager.Sync(tc); err != nil { return err }
我們將在下篇文章中介紹元件的 Sync 函式完成了哪些工作,TiDBCluster Controller 是怎樣完成各個元件的生命週期管理。
小結
通過這篇文章,我們瞭解到 TiDB Operator 如何從 cmd/controller-manager/main.go 初始化執行和如何實現對應的 controller 物件,並以 TidbCluster Controller 為例介紹了 Controller 從初始化到實際工作的過程以及 Controller 內部的工作邏輯。通過上面的程式碼執行邏輯的介紹,我們清楚了元件的生命週期控制迴圈是如何被觸發的,問題已經被縮小到如何細化這個控制迴圈,新增 TiDB 特殊的運維邏輯,使得 TiDB 能在 Kubernetes 上部署和正常執行,完成其他的生命週期操作。我們將在下一篇文章中討論如何細化這個控制迴圈,討論元件的控制迴圈的實現。
我們介紹了社群對於 Operator 模式的探索和演化。對於一些希望使用 Operator 模式開發資源管理系統的小夥伴,Kubernetes 社群中提供了 Kubebuilder 和 Operator Framework 兩個 Controller 腳手架專案。相比於參考 kubernetes/sample-controller 進行開發,Operator 腳手架基於 kubernetes-sigs/controller-runtime 生成 Controller 程式碼,減少了許多重複引入的模板化的程式碼。開發者只需要專注於完成 CRD 物件的控制迴圈部分即可,而不需要關心控制迴圈啟動之前的準備工作。
如果有什麼好的想法,歡迎通過 #sig-k8s 或 pingcap/tidb-operator 參與 TiDB Operator 社群交流。