Client-go 源码分析之 SharedInformer

语言: CN / TW / HK

作者:KaliArch(薛磊),某 Cloud MSP 服务商产品负责人,熟悉企业级高可用 / 高并发架构,包括混合云架构、异地灾备,熟练企业 DevOps 改造优化,熟悉 Shell/Python/Go 等开发语言,熟悉 Kubernetes、 Docker、云原生、微服务架构等。

前言

上篇单独就 Informer 中的 Controller 来看,processFunc 以一个参数单独穿入 NewInformer 中,如果有另一个程序需要处理相同的资源,那么就需要另外再创建一个 Informer 对象,而队列也无法复用,队列不能被两个消费者同时消费,因此在 Client-go 中又设计有 ShareInformer,后续的示例包括 K8s 的控制器中也都适用的是此类共享型的对象。

相关概念

资源 Informer

  • 每一种资源都实现了 Informer 机制,允许监控不同的资源事件;

  • 每一个 Informer 都会实现 Informer 和 Lister 方法。

type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}

SharedInformer

若同一个资源的 Informer 被实例化了多次,每个 Informer 使用一个 Reflector,那么会运行过多相同的 ListAndWatch,太多重复的序列化和反序列化操作会导致 api-server 负载过重。

SharedInformer 可以使同一类资源 Informer 共享一个 Reflector。内部定义了一个 map 字段,用于存放所有 Infromer 的字段。

通常会使用 informerFactory 来管理控制器需要的多个资源对象的 Informer 实例,例如创建一个 deployment 的 Informer。

// 创建一个Informer factory
sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0)
// factory已经为所有k8s的内置资源对象提供了创建对应Informer实例的方法,调用具体Informer实例的Lister或Informer方法
// 就完成了将Informer注册到factory的过程
deploymentLister := sharedInformerFactory.Apps().V1().Deployments().Lister()
// 启动注册到factory的所有Informer
kubeInformerFactory.Start(stopCh)

SharedInformer 是一个接口,包含添加事件,当有资源变化时,会回掉通知使用者,启动函数及获取是否全利卿对象已经同步到本地存储中。

type SharedInformer interface {
// 添加资源事件处理器,当有资源变化时就会通过回调通知使用者
AddEventHandler(handler ResourceEventHandler)
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)

// 获取一个 Store 对象
GetStore() Store
// 主要是用来将 Reflector 和 DeltaFIFO 组合到一起工作
GetController() Controller

// SharedInformer 的核心实现,启动并运行这个 SharedInformer
// 当 stopCh 关闭时候,Informer 才会退出
Run(stopCh <-chan struct{})

// 告诉使用者全量的对象是否已经同步到了本地存储中
HasSynced() bool
// 最新同步资源的版本
LastSyncResourceVersion() string
}

// SharedIndexInformer在SharedInformer基础上扩展了添加和获取Indexers的能力
type SharedIndexInformer interface {
SharedInformer
// 在启动之前添加 indexers 到 Informer 中
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}

源码分析

SharedInformerFactory

SharedInformerFactory 为所有已知 API 组版本中的资源提供共享 Informer。

type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

Admissionregistration() admissionregistration.Interface
Internal() apiserverinternal.Interface
Apps() apps.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Storage() storage.Interface
}

这里的 Informer 实现是 shareIndexInformer NewSharedInformerFactory 调用了 NewSharedInformerFactoryWithOptions,将返回一个 sharedInformerFactory 对象。

type sharedInformerFactory struct {
client kubernetes.Interface
//关注的namepace,可以通过WithNamespace Option配置
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
//针对每种类型资源存储一个Informer,Informer的类型是ShareIndexInformer
informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
}

// SharedInformerFactory 构造方法
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
// 初始化map
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}

// Apply all options
for _, opt := range options {
factory = opt(factory)
}

return factory
}

Start

启动 factory 下的所有 Informer,其实就是启动每个 Informer 中的 Reflector。

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
// 启动Informers中所有Informer
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

WaitForCacheSync

  • sharedInformerFactory 的 WaitForCacheSync 将会不断调用 factory 持有的所有 Informer 的 HasSynced 方法,直到返回 true;

  • 而 Informer 的 HasSynced 方法调用的自己持有的 Controller 的 HasSynced 方法(Informer 结构持有 Controller 对象,下文会分析 Informer 的结构);

  • Informer 中的 Controller 的 HasSynced 方法则调用的是 Controller 持有的 deltaFIFO 对象的 HasSynced 方法。

也就说 sharedInformerFactory 的 WaitForCacheSync 方法判断 Informer 的 cache 是否同步,最终看的是 Informer 中的 deltaFIFO 是否同步了。

func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()

res := map[reflect.Type]bool{}
for informType, informer := range informers {
// 调用Informer中的HasSynced
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}

InformerFor

只有向 factory 中添加 Informer,factory 才有意义,obj: informer 关注的资源如 deployment{} newFunc: 一个知道如何创建指定 Informer 的方法,k8s 为每一个内置的对象都实现了这个方法,比如创建 deployment 的 ShareIndexInformer 的方法。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
// 获取对象类型
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 根据用户传入的Informer生成方法生成Informer
informer = newFunc(f.client, resyncPeriod)
// 将Informer添加进SharedIndexInformer
f.informers[informerType] = informer

return informer
}

shareIndexInformer 对应的 newFunc 的实现,client-go 中已经为所有内置对象都提供了 NewInformerFunc,例如 podinformer。

// podinformer 接口,包含Informer/Lister两个方法
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}

// podInformer结构体,包含SharedInformerFactory接口和ns
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}

func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil)
}

// 真正生成SharedIndexInformer,在其中添加了eventHandler
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}

调用 sharedInformerFactory.Core().V1().Pods(),就掉用了 podInformer 构造函数,生成 Podinformer 对象。

// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

cache 包暴露的 Informer 创建方法有以下 5 个:

  • New

  • NewInformer

  • NewIndexerInformer

  • NewSharedInformer

  • NewSharedIndexInformer

它们有着不同程度的抽象和封装,NewSharedIndexInformer 是其中抽象程度最低,封装程度最高的一个,但即使是 NewSharedIndexInformer,也没有封装具体的资源类型,需要接收 ListerWatcher 和 Indexers 作为参数:

func NewSharedIndexInformer(
lw ListerWatcher,
exampleObject runtime.Object,
defaultEventHandlerResyncPeriod time.Duration,
indexers Indexers,
)
SharedIndexInformer
{}

sharedProcessor

sharedProcessor 有一个 processorListener 的集合,可以将一个通知对象分发给它的监听器。有两种分发操作。同步分发到侦听器的子集,(a) 在偶尔调用 shouldResync 时重新计算,(b) 每个侦听器最初都被放入。非同步分发到每个侦听器。在 SharedInformer 中有非常重要的一个属性 sharedProcessor,其包含了 processorListener,来通知从 sharedProcessor 到 ResourceEventHandler,其使用两个无缓冲 chanel,两个 goroutines 和一个无界环形缓冲区,一个 goroutine 运行 pop() ,它使用环形缓冲区中的存储将通知从 addCh 泵到 nextCh ,而 nextCh 没有跟上。另一个 goroutine 运行 run() ,它接收来自 nextCh 的通知并同步调用适当的处理程序方法。

type sharedProcessor struct {
// 所有processor是否都已经启动
listenersStarted bool
listenersLock sync.RWMutex
// 通用处理列表
listeners []*processorListener
// 定时同步的处理列表
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

// processorListener 通知
type processorListener struct {
nextCh chan interface{}
addCh chan interface{} // 添加事件的通道

handler ResourceEventHandler

// pendingNotifications 是一个无边界的环形缓冲区,用于保存所有尚未分发的通知
pendingNotifications buffer.RingGrowing

requestedResyncPeriod time.Duration

resyncPeriod time.Duration

nextResync time.Time

resyncLock sync.Mutex
}
  • add:事件添加通过 addCh 通道接受,notification 就是事件,也就是从 DeltaFIFO 弹出的 Deltas,addCh 是一个无缓冲通道,所以可以将其看作一个事件分发器,从 DeltaFIFO 弹出的对象需要逐一送到多个处理器,如果处理器没有及时处理 addCh 则会被阻塞。

func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
  • pop:利用 golang select 来同时处理多个 channel,直到至少有一个 case 表达式满足条件为止。

func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
// 通知run停止函数运行
defer close(p.nextCh) // Tell .run() to stop

var nextCh chan<- interface{}
var notification interface{}
for {
select {
// 由于nextCh还没有进行初始化,在此会zuse
case nextCh <- notification:
// 通知分发,
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // 没有事件被Pop,则设置nextCh为nil
nextCh = nil // Disable 这个 select的 case
}
// 从p.addCh 中读取一个事件,消费addCh
case notificationToAdd, ok := <-p.addCh:
if !ok { // 如果关闭则直接退出
return
}
// pendingNotifications 为空,则说明没有notification 去pop
if notification == nil {
// 把刚刚获取的事件通过 p.nextCh 发送给处理器
notification = notificationToAdd
nextCh = p.nextCh
} else {
// 上一个事件还没发送完成(已经有一个通知等待发送),就先放到缓冲通道中
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}

run() 和 pop() 是 processorListener 的两个最核心的函数,processorListener 就是实现了事件的缓冲和处理,在没有事件的时候可以阻塞处理器,当事件较多是可以把事件缓冲起来,实现了事件分发器与处理器的异步处理。processorListener 的 run() 和 pop() 函数其实都是通过 sharedProcessor 启动的协程来调用的,所以下面我们再来对 sharedProcessor 进行分析了。首先看下如何添加一个 processorListener:


// 添加处理器
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock() // 加锁
defer p.listenersLock.Unlock()
// 调用 addListenerLocked 函数
p.addListenerLocked(listener)
// 如果事件处理列表中的处理器已经启动了,则手动启动下面的两个协程
// 相当于启动后了
if p.listenersStarted {
// 通过 wait.Group 启动两个协程,就是上面我们提到的 run 和 pop 函数
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}

// 将处理器添加到处理器的列表中
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
// 添加到通用处理器列表中
p.listeners = append(p.listeners, listener)
// 添加到需要定时同步的处理器列表中
p.syncingListeners = append(p.syncingListeners, listener)
}

这里添加处理器的函数 addListener 其实在 sharedIndexInformer 中的 AddEventHandler 函数中就会调用这个函数来添加处理器。然后就是事件分发的函数实现:

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// sync 表示 obj 对象是否是同步事件对象
// 将对象分发给每一个事件处理器列表中的处理器
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}

然后就是将 sharedProcessor 运行起来:

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 遍历所有的处理器,为处理器启动两个后台协程:run 和 pop 操作
// 后续添加的处理器就是在上面的 addListener 中去启动的
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
// 标记为所有处理器都已启动
p.listenersStarted = true
}()
// 等待退出信号
<-stopCh
// 接收到退出信号后,关闭所有的处理器
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 遍历所有处理器
for _, listener := range p.listeners {
// 关闭 addCh,pop 会停止,pop 会通知 run 停止
close(listener.addCh)
}
// 等待所有协程退出,就是上面所有处理器中启动的两个协程 pop 与 run
p.wg.Wait()
}

到这里 sharedProcessor 就完成了对 ResourceEventHandler 的封装处理,当然最终 sharedProcessor 还是在 SharedInformer 中去调用的。

sharedIndexInformer 的 HandleDeltas 处理从 deltaFIFO pod 出来的增量时,先尝试更新到本地缓存 cache,更新成功的话会调用 processor.distribute 方法向 processor 中的 listener 添加 notification,listener 启动之后会不断获取 notification 回调用户的 EventHandler 方法。

在此就不在过度展开,更详细的内容还需要去看 K8s 源码。

小试牛刀

添加 event 事件

添加有 AddEventHandler,首次 list 关注的资源后,后期通过 watch 判断资源状态执行相应操作,通过 indexer 来缓存中获取对象。

func Muste(e error) {
if e != nil {
panic(e)
}
}
func InitClientSet() *kubernetes.Clientset {
kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
Muste(err)
return kubernetes.NewForConfigOrDie(restConfig)
}

func main() {
clientSet := InitClientSet()
// 初始化sharedInformerFactory
sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0)
// 生成podInformer
podInformer := sharedInformerFactory.Core().V1().Pods()
// 生成具体informer/indexer
informer := podInformer.Informer()
indexer := podInformer.Lister()
// 添加Event事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
func(obj interface{}) {
podObj := obj.(*corev1.Pod)

fmt.Printf("AddFunc: %s\n", podObj.GetName())

},
func(oldObj, newObj interface{}) {
oldPodObj := oldObj.(*corev1.Pod)
newPodObj := newObj.(*corev1.Pod)

fmt.Printf("old: %s\n", oldPodObj.GetName())
fmt.Printf("new: %s\n", newPodObj.GetName())

},
func(obj interface{}) {
podObj := obj.(*corev1.Pod)
fmt.Printf("deleteFunc: %s\n", podObj.GetName())
},
})

stopCh := make(chan struct{})
defer close(stopCh)
// 启动Informer
sharedInformerFactory.Start(stopCh)
// 等待同步完成
sharedInformerFactory.WaitForCacheSync(stopCh)

// 利用indexer获取资源
pods, err := indexer.List(labels.Everything())
Muste(err)
for _, items := range pods {
fmt.Printf("namespace: %s, name:%s\n", items.GetNamespace(), items.GetName())
}
<-stopCh
}

通过 Gin 实现 K8s 资源获取

单个资源对象获取

通过生成 deployinformer, 通过 gin 框架暴露访问入口,通过 Informer 中的 indexer 来获取 deployment 资源。

func main() {

clientSet, err := initClientSet()
if err != nil {
log.Fatal(err)
}
sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0)
// 获取deployinformeer
deployInformer := sharedInformerFactory.Apps().V1().Deployments()
_ = deployInformer.Informer()
lister := deployInformer.Lister()
stopCh := make(chan struct{})
sharedInformerFactory.Start(stopCh)
sharedInformerFactory.WaitForCacheSync(stopCh)

// http://localhost:8888/deploy?labels[k8s-app]=kube-dns
r := gin.Default()
r.GET("/deploy", func(c *gin.Context) {
var set map[string]string
// 通过labels过滤deploy
if content, ok := c.GetQueryMap("labels"); ok {
set = content
}
selectQuery := labels.SelectorFromSet(set)

podList, err := lister.List(selectQuery)
if err != nil {
c.JSON(400, gin.H{
"msg": err.Error(),
})
return
}
c.JSON(200, gin.H{
"data": podList,
})

})

r.Run(":8888")

//<-stopCh

}

根据路径参数获取不同资源

之前的示例可以获取制定资源,下面示例将 gvr 通过 gin 路径参数传递进来,进而实现一个 API 可以获取 K8s 所有资源。

func main() {
clientSet := InitClientSet()
sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0)
stopCh := make(chan struct{})
defer close(stopCh)

engin := gin.Default()
// 构造一个通用的根据不同group/version/kind 获取对象类型
//http://localhost:8888/core/v1/pods
engin.GET("/:group/:version/:resource", func(c *gin.Context) {
g, v, r := c.Param("group"), c.Param("version"), c.Param("resource")
// 如果是core组则为空字符串
if g == "core" {
g = ""
}
gvr := schema.GroupVersionResource{g, v, r}
genericInformer, _ := sharedInformerFactory.ForResource(gvr)
_ = genericInformer.Informer()
sharedInformerFactory.Start(stopCh)
sharedInformerFactory.WaitForCacheSync(stopCh)
items, err := genericInformer.Lister().List(labels.Everything())
if err != nil {
c.JSON(500, gin.H{
"msg": err,
})
}
c.JSON(200, gin.H{
"msg": items,
})
})

engin.Run(":8888")
}

总结

在以上的内容中,我们分析了 Informers 包的源代码,并简单测试了 Informer 的原理,更详细的内容还是需要自己去看源码领会,后续看完 indexer 后,我们自己手动去编写一个 Controller 来加深对 Informer 整个流程的理解。

参考链接

  • https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html

KubeSphere (https://kubesphere.io)是在 Kubernetes 之上构建的 开源容器混合云 ,提供全栈的 IT 自动化运维的能力,简化企业的 DevOps 工作流。

KubeSphere 已被  Aqara 智能家居、爱立信、本来生活、东软、华云、新浪、三一重工、华夏银行、四川航空、国药集团、微众银行、杭州数跑科技、紫金保险、去哪儿网、中通、中国人民银行、中国银行、中国人保寿险、中国太平保险、中国移动、中国联通、中国电信、天翼云、中移金科、Radore、ZaloPay  等海内外数千家企业采用。KubeSphere 提供了开发者友好的向导式操作界面和丰富的企业级功能,包括  Kubernetes   多云与多集群管理、DevOps (CI/CD)、应用生命周期管理、边缘计算、微服务治理 (Service Mesh)、多租户管理、可观测性、存储与网络管理、GPU support  等功能,帮助企业快速构建一个强大和功能丰富的容器云平台。

 :sparkles: GitHub :https://github.com/kubesphere

 :computer: 官网(中国站) :https://kubesphere.com.cn

:man:‍:computer:‍  微信群: 请搜索添加群助手微信号  kubesphere

 :link: 企业服务 https://kubespher e.cloud