淺析kubernetes中client-go Informer
之前瞭解了client-go中的架構設計,也就是 tools/cache
下面的一些概念,那麼下面將對informer進行分析
Controller
在client-go informer架構中存在一個 controller
,這個不是 Kubernetes 中的Controller元件;而是在 tools/cache
中的一個概念, controller
位於 informer 之下,Reflector 之上。 code
Config
從嚴格意義上來講, controller
是作為一個 sharedInformer
使用,通過接受一個 Config
,而 Reflector
則作為 controller
的 slot。 Config
則包含了這個 controller
裡所有的設定。
type Config struct { Queue // DeltaFIFO ListerWatcher // 用於list watch的 Process ProcessFunc // 定義如何從DeltaFIFO中彈出資料後處理的操作 ObjectType runtime.Object // Controller處理的物件資料,實際上就是kubernetes中的資源 FullResyncPeriod time.Duration // 全量同步的週期 ShouldResync ShouldResyncFunc // Reflector通過該標記來確定是否應該重新同步 RetryOnError bool }
controller
然後 controller
又為 reflertor
的上層
type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock } type Controller interface { // controller 主要做兩件事, // 1. 構建並執行 Reflector,將listerwacther中的泵壓到queue(Delta fifo)中 // 2. Queue用Pop()彈出資料,具體的操作是Process // 直到 stopCh 不阻塞,這兩個協程將退出 Run(stopCh <-chan struct{}) HasSynced() bool // 這個實際上是從store中繼承的,標記這個controller已經 LastSyncResourceVersion() string }
controller
中的方法,僅有一個 Run()
和 New()
;這意味著, controller
只是一個抽象的概念,作為 Reflector
, Delta FIFO
整合的工作流
而 controller
則是 SharedInformer
了。
Queue
這裡的 queue
可以理解為是一個具有 Pop()
功能的 Indexer
;而 Pop()
的功能則是 controller
中的一部分;也就是說 queue
是一個擴充套件的 Store
, Store
是不具備彈出功能的。
type Queue interface { Store // Pop會阻塞等待,直到有內容彈出,刪除對應的值並處理計數器 Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent puts the given accumulator into the Queue (in // association with the accumulator's key) if and only if that key // is not already associated with a non-empty accumulator. AddIfNotPresent(interface{}) error // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace // operation if that happened before any Add, Update, or Delete; // otherwise the first batch is empty. HasSynced() bool Close() // 關閉queue }
而彈出的操作是通過 controller 中的 processLoop()
進行的,最終走到Delta FIFO中進行處理。
通過忙等待去讀取要彈出的資料,然後在彈出前 通過 PopProcessFunc
進行處理
func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { return nil, ErrFIFOClosed } f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) // 進行處理 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) // 如果失敗,再重新加入到佇列中 err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }
Informer
通過對 Reflector
, Store
, Queue
, ListerWatcher
、 ProcessFunc
, 等的概念,發現由 controller
所包裝的起的功能並不能完成通過對API的動作監聽,並通過動作來處理本地快取的一個能力;這個情況下誕生了 informer
嚴格意義上來講是 sharedInformer
func newInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, clientState Store, ) Controller { // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: clientState, EmitDeltaTypeReplaced: true, }) cfg := &Config{ Queue: fifo, ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: resyncPeriod, RetryOnError: false, Process: func(obj interface{}) error { // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err } h.OnUpdate(old, d.Object) } else { if err := clientState.Add(d.Object); err != nil { return err } h.OnAdd(d.Object) } case Deleted: if err := clientState.Delete(d.Object); err != nil { return err } h.OnDelete(d.Object) } } return nil }, } return New(cfg) }
newInformer是位於 tools/cache/controller.go 下,可以看出,這裡面並沒有informer的概念,這裡通過註釋可以看到,newInformer實際上是一個提供了儲存和事件通知的informer。他關聯的 queue
則是 Delta FIFO
,幷包含了 ProcessFunc
, Store
等 controller的概念。最終對外的方法為 NewInformer()
func NewInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, ) (Store, Controller) { // This will hold the client state, as we know it. clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) return clientState, newInformer(lw, objType, resyncPeriod, h, clientState) } type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) }
可以看到 NewInformer()
就是一個帶有 Store功能的controller,通過這些可以假定出, Informer 就是 controller
,將queue中相關操作分發給不同事件處理的功能
SharedIndexInformer
shareInformer
為客戶端提供了與apiserver一致的資料物件本地快取,並支援多事件處理程式的 informer ,而 shareIndexInformer
則是對 shareInformer
的擴充套件
type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. AddEventHandler(handler ResourceEventHandler) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation // consists of delivering to the handler an update notification // for every object in the informer's local cache; it does not add // any interactions with the authoritative storage. Some // informers do no resyncs at all, not even for handlers added // with a non-zero resyncPeriod. For an informer that does // resyncs, and for each handler that requests resyncs, that // informer develops a nominal resync period that is no shorter // than the requested period but may be longer. The actual time // between any two resyncs may be longer than the nominal period // because the implementation takes time to do work and there may // be competing load and scheduling noise. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful GetController() Controller // Run starts and runs the shared informer, returning after it stops. // The informer will be stopped when stopCh is closed. Run(stopCh <-chan struct{}) // HasSynced returns true if the shared informer's store has been // informed by at least one full LIST of the authoritative state // of the informer's object collection. This is unrelated to "resync". HasSynced() bool // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. LastSyncResourceVersion() string }
SharedIndexInformer
是對SharedInformer的實現,可以從結構中看出, SharedIndexInformer
大致具有如下功能:
Deltal FIFO
type sharedIndexInformer struct { indexer Indexer // 具有索引的本地快取 controller Controller // controller processor *sharedProcessor // 事件處理函式集合 cacheMutationDetector MutationDetector listerWatcher ListerWatcher objectType runtime.Object resyncCheckPeriod time.Duration defaultEventHandlerResyncPeriod time.Duration clock clock.Clock started, stopped bool startedLock sync.Mutex blockDeltas sync.Mutex }
而在 tools/cache/share_informer.go 可以看到 shareIndexInformer 的執行過程
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, // process 彈出時操作的流程 } func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() // Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) wg.StartWithChannel(processorStopCh, s.processor.run) // 啟動事件處理函式 defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // Don't want any new listeners }() s.controller.Run(stopCh) // 啟動controller,controller會啟動Reflector和fifo的Pop() }
而在操作Delta FIFO中可以看到,做具體操作時,會將動作分發至對應的事件處理函式中,這個是informer初始化時對事件操作的函式
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: isSync = true case d.Type == Replaced: if accessor, err := meta.Accessor(d.Object); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } // 事件的分發 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } // 事件的分發 s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
事件處理函式 processor
啟動informer時也會啟動註冊進來的事件處理函式; processor
就是這個事件處理函式。
run()
函式會啟動兩個 listener,j監聽事件處理業務函式 listener.run
和 事件的處理
wg.StartWithChannel(processorStopCh, s.processor.run) func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop }
可以看出,就是拿到的事件,根據註冊的到informer的事件函式進行處理
func (p *processorListener) run() { stopCh := make(chan struct{}) wait.Until(func() { for next := range p.nextCh { // 消費事件 switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed close(stopCh) }, 1*time.Second, stopCh) }
informer中的事件的設計
瞭解了informer如何處理事件,就需要學習下,informer的事件系統設計 prossorListener
事件的新增
當在handleDelta時,會分發具體的事件
// 事件的分發 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
此時,事件泵 Pop()
會根據接收到的事件進行處理
// run() 時會啟動一個事件泵 p.wg.Start(listener.pop) func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // 這裡實際上是一個阻塞的等待 // 單向channel 可能不會走到這步驟 var ok bool // deltahandle 中 distribute 會將事件新增到addCh待處理事件中 // 處理完事件會再次拿到一個事件 notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } // 處理 分發過來的事件 addCh case notificationToAdd, ok := <-p.addCh: // distribute分發的事件 if !ok { return } // 這裡代表第一次,沒有任何事件時,或者上面步驟完成讀取 if notification == nil { // 就會走這裡 notification = notificationToAdd nextCh = p.nextCh } else { // notification否則代表沒有處理完,將資料再次新增到待處理中 p.pendingNotifications.WriteOne(notificationToAdd) } } } }
該訊息事件的流程圖為
通過一個簡單例項來學習client-go中的訊息通知機制
package main import ( "fmt" "time" "k8s.io/utils/buffer" ) var nextCh1 = make(chan interface{}) var addCh = make(chan interface{}) var stopper = make(chan struct{}) var notification interface{} var pendding = *buffer.NewRingGrowing(2) func main() { // pop go func() { var nextCh chan<- interface{} var notification interface{} //var n int for { fmt.Println("busy wait") fmt.Println("entry select", notification) select { // 初始時,一個未初始化的channel,nil,形成一個阻塞(單channel下是死鎖) case nextCh <- notification: fmt.Println("entry nextCh", notification) var ok bool // 讀不到資料代表已處理完,置空鎖 notification, ok = pendding.ReadOne() if !ok { fmt.Println("unactive nextch") nextCh = nil } // 事件的分發,監聽,初始時也是一個阻塞 case notificationToAdd, ok := <-addCh: fmt.Println(notificationToAdd, notification) if !ok { return } // 執行緒安全 // 當訊息為空時,沒有被處理 // 鎖為空,就分發資料 if notification == nil { fmt.Println("frist notification nil") notification = notificationToAdd nextCh = nextCh1 // 這步驟等於初始化了區域性的nextCh,會觸發上面的流程 } else { // 在第三次時,會走到這裡,資料進入環 fmt.Println("into ring", notificationToAdd) pendding.WriteOne(notificationToAdd) } } } }() // producer go func() { i := 0 for { i++ if i%5 == 0 { addCh <- fmt.Sprintf("thread 2 inner -- %d", i) time.Sleep(time.Millisecond * 9000) } else { addCh <- fmt.Sprintf("thread 2 outer -- %d", i) time.Sleep(time.Millisecond * 500) } } }() // subsriber go func() { for { for next := range nextCh1 { time.Sleep(time.Millisecond * 300) fmt.Println("consumer", next) } } }() <-stopper }
總結,這裡的機制類似於執行緒安全,進入臨界區的一些演算法,臨界區就是 nextCh
, notification
就是保證了至少有一個程序可以進入臨界區(要麼分發事件,要麼生產事件); nextCh
和 nextCh1
一個是區域性管道一個是全域性的,管道未初始化代表了死鎖(阻塞);當有訊息要處理時,會將區域性管道 nextCh
賦值給 全域性 nextCh1
此時相當於解除了分發的步驟(對管道賦值,觸發分發操作); ringbuffer
實際上是提供了一個對 notification
加鎖的操作,在沒有處理的訊息時,需要保障 notification
為空,同時也關閉了流程 nextCh
的寫入。這裡主要是考慮對golang中channel的用法
- 分享自己平時使用的socket多客戶端通訊的程式碼技術點和軟體使用
- iNeuOS工業網際網路作業系統,增加2154個檢視建模(WEB組態)行業向量圖元、大屏背景及相關圖元
- 多臺雲伺服器的 Kubernetes 叢集搭建
- Elasticsearch學習系列四(聚合搜尋)
- 關於swiper外掛在vue2的使用
- 使用 Abp.Zero 搭建第三方登入模組(一):原理篇
- LVGL庫入門教程 - 顏色和影象
- 物聯網?快來看 Arduino 上雲啦
- SpringBoot JWT Redis 開源知識社群系統
- CVPR2022 | 可精簡域適應
- Spring框架系列(3) - 深入淺出Spring核心之控制反轉(IOC)
- 面試突擊59:一個表中可以有多個自增列嗎?
- CVPR2022 | 弱監督多標籤分類中的損失問題
- JDBC、ORM、JPA、Spring Data JPA,傻傻分不清楚?一文帶你釐清箇中曲直,給你個選擇SpringDataJPA的...
- Spring Security:使用者和Spring應用之間的安全屏障
- Mybatisi和Spring整合原始碼分析
- 前端學習 linux —— 第一篇
- call apply bind的作用及區別? 應用場景?
- Bika LIMS 開源LIMS集——實驗室檢驗流程概述及主頁、面板
- 軟體專案管理 7.5.專案進度模型(SPSP)