淺析Kubernetes中client-go informer

語言: CN / TW / HK

本篇文章對Kubernetes中client-go informer進行分析。

Controller

在client-go informer架構中存在一個  controller  ,這個不是 Kubernetes 中的Controller元件;而是在  tools/cache  中的一個概念, controller  位於 informer 之下,Reflector 之上。

Config

從嚴格意義上來講, controller  是作為一個  sharedInformer  使用,通過接受一個  Config  ,而  Reflector  則作為  controller  的 slot。 Config  則包含了這個  controller  裡所有的設定。

type Config struct {
Queue // DeltaFIFO
ListerWatcher // 用於list watch的
Process ProcessFunc // 定義如何從DeltaFIFO中彈出資料後處理的操作
ObjectType runtime.Object // Controller處理的物件資料,實際上就是kubernetes中的資源
FullResyncPeriod time.Duration // 全量同步的週期
ShouldResync ShouldResyncFunc // Reflector通過該標記來確定是否應該重新同步
RetryOnError bool
}

controller

然後  controller  又為  reflertor  的上層。

type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}


type Controller interface {
// controller 主要做兩件事,
// 1. 構建並執行 Reflector,將listerwacther中的泵壓到queue(Delta fifo)中
// 2. Queue用Pop()彈出資料,具體的操作是Process
// 直到 stopCh 不阻塞,這兩個協程將退出
Run(stopCh <-chan struct{})
HasSynced() bool // 這個實際上是從store中繼承的,標記這個controller已經
LastSyncResourceVersion() string
}

controller  中的方法,僅有一個  Run()  和  New( ) ;這意味著, controller  只是一個抽象的概念,作為  Reflector Delta FIFO  整合的工作流。

而  controller  則是  SharedInformer  了。

Queue

這裡的  queue  可以理解為是一個具有  Pop()  功能的  Indexer  ;而  Pop()  的功能則是  controller  中的一部分;也就是說  queue  是一個擴充套件的  Store  ,  Store  是不具備彈出功能的。

type Queue interface {
  Store
// Pop會阻塞等待,直到有內容彈出,刪除對應的值並處理計數器
  Pop(PopProcessFunc) (interface{}, error)
 
// AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
  AddIfNotPresent(interface{}) error
 
// HasSynced returns true if the first batch of keys have all been
// popped.  The first batch of keys are those of the first Replace
// operation if that happened before any Add, Update, or Delete;
// otherwise the first batch is empty.
  HasSynced() bool
Close() // 關閉queue
}

而彈出的操作是通過 controller 中的  processLoop( )  進行的,最終走到Delta FIFO中進行處理。

通過忙等待去讀取要彈出的資料,然後在彈出前 通過 PopProcessFunc  進行處理。

func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

DeltaFIFO.Pop(   )

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}


f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item) // 進行處理
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item) // 如果失敗,再重新加入到佇列中
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

Informer

通過對  Reflector Store Queue ListerWatcher ProcessFunc , 等的概念,發現由  controller  所包裝的起的功能並不能完成通過對API的動作監聽,並通過動作來處理本地快取的一個能力;這個情況下誕生了  informer  嚴格意義上來講是 sharedInformer。

func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})


cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,


Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}

newInformer是位於  tools/cache/controller.go  下,可以看出,這裡面並沒有informer的概念,這裡通過註釋可以看到,newInformer實際上是一個提供了儲存和事件通知的informer。他關聯的  queue  則是  Delta FIFO ,幷包含了  ProcessFunc Store  等 controller的概念。最終對外的方法為  NewInformer(  )。

func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)


return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}


type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

SharedIndexInformer

shareInformer  為客戶端提供了與apiserver一致的資料物件本地快取,並支援多事件處理程式的 informer ,而  shareIndexInformer  則是對 shareInformer  的擴充套件。

type SharedInformer interface {  // AddEventHandler adds an event handler to the shared informer using the shared informer's resync  // period.  Events to a single handler are delivered sequentially, but there is no coordination  // between different handlers.  AddEventHandler(handler ResourceEventHandler)  // AddEventHandlerWithResyncPeriod adds an event handler to the  // shared informer with the requested resync period; zero means  // this handler does not care about resyncs.  The resync operation  // consists of delivering to the handler an update notification  // for every object in the informer's local cache; it does not add  // any interactions with the authoritative storage.  Some  // informers do no resyncs at all, not even for handlers added  // with a non-zero resyncPeriod.  For an informer that does  // resyncs, and for each handler that requests resyncs, that  // informer develops a nominal resync period that is no shorter  // than the requested period but may be longer.  The actual time  // between any two resyncs may be longer than the nominal period  // because the implementation takes time to do work and there may  // be competing load and scheduling noise.  AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)  // GetStore returns the informer's local cache as a Store.  GetStore() Store  // GetController is deprecated, it does nothing useful  GetController() Controller  // Run starts and runs the shared informer, returning after it stops.  // The informer will be stopped when stopCh is closed.  Run(stopCh <-chan struct{})  // HasSynced returns true if the shared informer's store has been  // informed by at least one full LIST of the authoritative state  // of the informer's object collection.  This is unrelated to "resync".  HasSynced() bool  // LastSyncResourceVersion is the resource version observed when last synced with the underlying  // store. The value returned is not synchronized with access to the underlying store and is not  // thread-safe.  LastSyncResourceVersion() string}

SharedIndexInformer  是對SharedInformer的實現,可以從結構中看出, SharedIndexInformer  大致具有如下功能:

·  索引本地快取

·  controller,通過list watch拉取API並推入  Deltal FIFO

·  事件的處理

type sharedIndexInformer struct {
indexer Indexer // 具有索引的本地快取
controller Controller // controller


processor *sharedProcessor // 事件處理函式集合
cacheMutationDetector MutationDetector


listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
}

而在  tools/cache/share_informer.go  可以看到 shareIndexInformer 的執行過程。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()


fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})


cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,


Process: s.HandleDeltas, // process 彈出時操作的流程
}


func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()


s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()


// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run) // 啟動事件處理函式


defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh) // 啟動controller,controller會啟動Reflector和fifo的Pop()
}

而在操作Delt a FIFO中可以看到,做具體操作時,會將動作分發至對應的事件處理函式中,這個是informer初始化時對事件操作的函式。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()




for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}


isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 事件的分發
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 事件的分發
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

事件處理函式 processor

啟動informer時也會啟動註冊進來的事件處理函式; processor  就是這個事件處理函式。

run( )  函式會啟動兩個 listener,j監聽事件處理業務函式  listener.run  和 事件的處理。

wg.StartWithChannel(processorStopCh, s.processor.run)


func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看出,就是拿到的事件,根據註冊的到informer的事件函式進行處理。

func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh { // 消費事件
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}

informer中的事件的設計

瞭解了informer如何處理事件,就需要學習下,informer的事件系統設計  prossorListener

事件的新增

當在handleDelta時,會分發具體的事件

// 事件的分發
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)

此時,事件泵  Pop( )  會根據接收到的事件進行處理。

// run() 時會啟動一個事件泵
p.wg.Start(listener.pop)


func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh)


var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification: // 這裡實際上是一個阻塞的等待
// 單向channel 可能不會走到這步驟
var ok bool
// deltahandle 中 distribute 會將事件新增到addCh待處理事件中
// 處理完事件會再次拿到一個事件
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
// 處理 分發過來的事件 addCh
case notificationToAdd, ok := <-p.addCh: // distribute分發的事件
if !ok {
return
}
// 這裡代表第一次,沒有任何事件時,或者上面步驟完成讀取
if notification == nil { // 就會走這裡
notification = notificationToAdd
nextCh = p.nextCh
} else {
// notification否則代表沒有處理完,將資料再次新增到待處理中
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}

該訊息 事件的流程圖為:

通過一個簡單例項來學習client-go中的訊息通知機制:

package main


import (
"fmt"
"time"


"k8s.io/utils/buffer"
)


var nextCh1 = make(chan interface{})
var addCh = make(chan interface{})
var stopper = make(chan struct{})
var notification interface{}
var pendding = *buffer.NewRingGrowing(2)


func main() {
// pop
go func() {
var nextCh chan<- interface{}
var notification interface{}
//var n int
for {
fmt.Println("busy wait")
fmt.Println("entry select", notification)
select {
// 初始時,一個未初始化的channel,nil,形成一個阻塞(單channel下是死鎖)
case nextCh <- notification:
fmt.Println("entry nextCh", notification)
var ok bool
// 讀不到資料代表已處理完,置空鎖
notification, ok = pendding.ReadOne()
if !ok {
fmt.Println("unactive nextch")
nextCh = nil
}
// 事件的分發,監聽,初始時也是一個阻塞
case notificationToAdd, ok := <-addCh:
fmt.Println(notificationToAdd, notification)
if !ok {
return
}
// 執行緒安全
// 當訊息為空時,沒有被處理
// 鎖為空,就分發資料
if notification == nil {
fmt.Println("frist notification nil")
notification = notificationToAdd
nextCh = nextCh1 // 這步驟等於初始化了區域性的nextCh,會觸發上面的流程
} else {
// 在第三次時,會走到這裡,資料進入環
fmt.Println("into ring", notificationToAdd)
pendding.WriteOne(notificationToAdd)
}
}
}
}()
// producer
go func() {
i := 0
for {
i++
if i%5 == 0 {
addCh <- fmt.Sprintf("thread 2 inner -- %d", i)
time.Sleep(time.Millisecond * 9000)
} else {
addCh <- fmt.Sprintf("thread 2 outer -- %d", i)
time.Sleep(time.Millisecond * 500)
}
}
}()
// subsriber
go func() {
for {
for next := range nextCh1 {
time.Sleep(time.Millisecond * 300)
fmt.Println("consumer", next)
}
}
}()
<-stopper
}

總結:

這裡的機制類 似於執行緒安全,進入臨界區的一些演算法,臨界區就是  nextChnotification  就是保證了至少有一個程序可以進入臨界區(要麼分發事件,要麼生產事件); nextCh  和  nextCh1  一個是區域性管道一個是全域性的,管道未初始化代表了死鎖(阻塞);當有訊息要處理時,會將區域性管道  nextCh  賦值給 全域性  nextCh1  此時相當於解除了分發的步驟(對管道賦值,觸發分發操作); ringbuffer  實際上是提供了一個對  notification  加鎖的操作,在沒有處理的訊息時,需要保障  notification  為空,同時也關閉了流程  nextCh  的寫入。這裡主要是考慮對golang中channel的用法。

原文連結:

https://www.cnblogs.com/Cylon/p/16311233.html

掃描二維碼 | 加入 雲原生 技術交流群