Istio源码解析3-Istio中配置与服务下发
上一篇 中我们介绍了Istio中服务发现与配置处理,无论是Istio访问外部服务的配置(serviceentry、workloadentry) 、Istio流量规则(virtualservices、destinationrule等)还是Kubernetes原生的服务,在Istio中都是使用informer进行事件的监听,并使用handler进行相关事件的处理,在各个handler处理结束基本都是使用XDSServer.ConfigUpdate把处理好的配置与服务进行XDS的处理,本篇我们详细介绍下Istio是如何与数据面进行交互并进行配置的分发。
作者: 李运田, 中国移动云能力中心软件开发工程师,专注于云原生、Istio、微服务、Spring Cloud 等领域。
01
DiscoveryServer启动
从上面我们看出基本所有的下发都是使用 XDSServer ,首先我们看下 XDSServer 的初始化结构
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
type DiscoveryServer struct {
// Pilot 环境所需的 API 集合
Env *model.Environment
// xDS 数据的生成器接口
ConfigGenerator core.ConfigGenerator
// 统一接收其他组件发来的 PushRequest 的 channel
pushChannel chan *model.PushRequest
//WorkloadEntry控制器,serviceEntryController在Server中定义
WorkloadEntryController *workloadentry.Controller
// pushQueue 主要是在真正 xDS 推送前做防抖缓存
pushQueue *PushQueue
// 保存了所有生效的 gRPC 连接
adsClients map[string]*Connection
//防抖参数
debounceOptions debounceOptions
}
前面的文章中我们也大概介绍了下 pilot-discovery 的启动,在这里我们通过下图看下 pilot-discovery 的启动流程以及服务、配置下发过程。
pilot-discovery 启动方法 Start() 中会启动两个协程 handleUpdates 和 sendPushes ,这俩协程的启动及作用在上图中也大概有标明, handleUpdates 主要用来处理 pushChannel 中收到的推送请求以及防抖。 sendPushes 则负责具体的推送。
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
go s.WorkloadEntryController.Run(stopCh)
go s.handleUpdates(stopCh)
go s.periodicRefreshMetrics(stopCh)
go s.sendPushes(stopCh)
}
02
客户端envoy连接及配置处理
在 envoy/service/discovery/v3/ads.pb.go 中定义了 RPC 接口
var _AggregatedDiscoveryService_serviceDesc = grpc.ServiceDesc{
Streams: []grpc.StreamDesc{
{
StreamName: "StreamAggregatedResources",
Handler: _AggregatedDiscoveryService_StreamAggregatedResources_Handler,
ServerStreams: true,
ClientStreams: true,
},
{
StreamName: "DeltaAggregatedResources",
Handler: _AggregatedDiscoveryService_DeltaAggregatedResources_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
}
StreamAggregatedResources 接收 DiscoveryRequest 返回 DiscoveryResponse 流,包含全量的 xDS 数据
DeltaAggregatedResources 接收 DeltaDiscoveryRequest ,返回 DeltaDiscoveryResponse 流,包含增量的 xDS 数据
这里我们以处理全量的 xDS 数据为例
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
return s.Stream(stream)
}
func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
//安全认证
ids, err := s.authenticate(ctx)
//建立connection连接
con := newConnection(peerAddr, stream)
//每个connection开一个协程接收放入connection channel中的req
go s.receive(con, ids)
for {
//循环处理channel中的req
select {
case req, ok := <-con.reqChan: // 这里是envoy->istiod
if ok {
if err := s.processRequest(req, con); err != nil {
return err
}
} else {
// Remote side closed connection or error processing the request.
return <-con.errorChan
}
case pushEv := <-con.pushChannel: // 这里是istiod->envoy
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return err
}
}
}
}
EnvoyXdsServer 启动完后, 开始接收来自 configController 、 serviceController 的配置变化 以及 K8s 原生服务 事件, 当 服务数据的变化和配置数据的变化 时 ,都会创建 PushRequest 发送至 EnvoyXdsServer 的 pushChannel 中。
放入 pushChannel 中的 PushRequest 后续会通过 handleUpdates 进一步的进行处理, handleUpdates 最重要的功能就是防抖,避免因过快的推送带来的问题和压力。
防抖处理的主要函数
// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {
push := func(req *model.PushRequest, debouncedEvents int) {
pushFn(req)
updateSent.Add(int64(debouncedEvents))
freeCh <- struct{}{}
}
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
//当事件的延迟时间大于等于最大延迟时间或静默时间大于等于最小静默时间,才会执行 push() 方法
if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
if req != nil {
pushCounter++
free = false
go push(req, debouncedEvents)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(opts.debounceAfter - quietTime)
}
}
for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
// If reason is not set, record it as an unknown reason
if len(r.Reason) == 0 {
r.Reason = []model.TriggerReason{model.UnknownTrigger}
}
if !opts.enableEDSDebounce && !r.Full {
// trigger push now, just for EDS
go func(req *model.PushRequest) {
pushFn(req)
updateSent.Inc()
}(r)
continue
}
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(opts.debounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
//合并连续发生的多个PushRequest
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}
经过防抖处理后会调用s.Push进行req的推送
func (s *DiscoveryServer) startPush(req *model.PushRequest) {
// Push config changes, iterating over connected envoys
req.Start = time.Now()
//对连接的所有envoy客户端connection进行req推送
for _, p := range s.AllClients() {
s.pushQueue.Enqueue(p, req)
}
}
s.pushQueue中保存了所有客户端envoy的connection信息以及PushRequest信息,具体的PushQueue结构如下
type PushQueue struct {
cond *sync.Cond
// pending stores all connections in the queue. If the same connection is enqueued again,
// the PushRequest will be merged.
//pending保存了所有代理gRPC连接的PushRequest ,如果相同连接的PushRequest再次入队,将会被合并
pending map[*Connection]*model.PushRequest
// 所有的envoy连接connection
queue []*Connection
// processing stores all connections that have been Dequeue(), but not MarkDone().
// The value stored will be initially be nil, but may be populated if the connection is Enqueue().
// If model.PushRequest is not nil, it will be Enqueued again once MarkDone has been called.
processing map[*Connection]*model.PushRequest
shuttingDown bool
}
到这里就把集群中监听到的 Istio CRD 配置事件以及 K8s 的服务事件都入队到 PushQueue 结构中,后面会调用 doSendPushes 从 PushQueue 结构中获取每个 connection 及 req 信息进行配置和服务的推送。
03
配置与服务的推送
通过 doSendPushes 从 pushQueue 中通过 Dequeue() 方法获取 每个 客户端 connection 和对应的 PushRequest ,再根据 PushRequest 生成 pushEv 传入客户端 connection 的 pushChannel 中。
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
//从pushQueue中通过Dequeue()方法获取每个客户端connection和对应的PushRequest
client, push, shuttingdown := queue.Dequeue()
if shuttingdown {
return
}
recordPushTriggers(push.Reason...)
// Signals that a push is done by reading from the semaphore, allowing another send on it.
doneFunc := func() {
queue.MarkDone(client)
<-semaphore
}
proxiesQueueTime.Record(time.Since(push.Start).Seconds())
var closed <-chan struct{}
if client.stream != nil {
closed = client.stream.Context().Done()
} else {
closed = client.deltaStream.Context().Done()
}
go func() {
//生成pushEv
pushEv := &Event{
pushRequest: push,
done: doneFunc,
}
select {
//把pushEv传入每个客户端的channel中
case client.pushChannel <- pushEv:
return
case <-closed: // grpc stream was closed
doneFunc()
log.Infof("Client closed connection %v", client.conID)
}
}()
}
}
}
在前面介绍 pilot/pkg/xds/ads.go 中 StreamAggregatedResources接受每个客户端连接请求 时,有如下代码
case pushEv := <-con.pushChannel: // 这里是istiod->envoy
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return err
}
这样 把 pushEv 传入每个客户端的 channel 中 后,会调用此处的 pushEv := <-con.pushChanne l 获取到 pushEv 并使用 s.pushConnection(con, pushEv) 进行配置的推送
// Compute and send the new configuration for a connection.
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
pushRequest := pushEv.pushRequest
//直接进行全量推送
if pushRequest.Full {
// Update Proxy with current information.
s.updateProxy(con.proxy, pushRequest)
}
if !s.ProxyNeedsPush(con.proxy, pushRequest) {
log.Debugf("Skipping push to %v, no updates required", con.conID)
if pushRequest.Full {
// Only report for full versions, incremental pushes do not have a new version.
reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil)
}
return nil
}
// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
wrl, ignoreEvents := con.pushDetails()
for _, w := range wrl {
if err := s.pushXds(con, w, pushRequest); err != nil {
return err
}
}
if pushRequest.Full {
// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.
reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents)
}
proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
return nil
}
04
总结
文中介绍了 EnvoyXdsServer 的结构以及 EnvoyXdsServer 的启动流程、怎么与 envoy 客户端建立连接,当 Istio CRD 配置、 K8s 服务事件变化后,怎么监控到事件并把相关配置传到 EnvoyXdsServer 的 channel 中,如何进行防抖及推送,最后把事件传到每个客户端的 connection 中。
点击【阅读原文】到达Istio网站。
CNCF概况(幻灯片)
扫描二维码联系我们!
CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux Foundation,是非营利性组织。
CNCF ( 云原生计算基金会 )致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请长按以下二维码进行关注。