Istio原始碼解析3-Istio中配置與服務下發
上一篇 中我們介紹了Istio中服務發現與配置處理,無論是Istio訪問外部服務的配置(serviceentry、workloadentry) 、Istio流量規則(virtualservices、destinationrule等)還是Kubernetes原生的服務,在Istio中都是使用informer進行事件的監聽,並使用handler進行相關事件的處理,在各個handler處理結束基本都是使用XDSServer.ConfigUpdate把處理好的配置與服務進行XDS的處理,本篇我們詳細介紹下Istio是如何與資料面進行互動並進行配置的分發。
作者: 李運田, 中國移動雲能力中心軟體開發工程師,專注於雲原生、Istio、微服務、Spring Cloud 等領域。
01
DiscoveryServer啟動
從上面我們看出基本所有的下發都是使用 XDSServer ,首先我們看下 XDSServer 的初始化結構
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
type DiscoveryServer struct {
// Pilot 環境所需的 API 集合
Env *model.Environment
// xDS 資料的生成器介面
ConfigGenerator core.ConfigGenerator
// 統一接收其他元件發來的 PushRequest 的 channel
pushChannel chan *model.PushRequest
//WorkloadEntry控制器,serviceEntryController在Server中定義
WorkloadEntryController *workloadentry.Controller
// pushQueue 主要是在真正 xDS 推送前做防抖快取
pushQueue *PushQueue
// 儲存了所有生效的 gRPC 連線
adsClients map[string]*Connection
//防抖引數
debounceOptions debounceOptions
}
前面的文章中我們也大概介紹了下 pilot-discovery 的啟動,在這裡我們通過下圖看下 pilot-discovery 的啟動流程以及服務、配置下發過程。
pilot-discovery 啟動方法 Start() 中會啟動兩個協程 handleUpdates 和 sendPushes ,這倆協程的啟動及作用在上圖中也大概有標明, handleUpdates 主要用來處理 pushChannel 中收到的推送請求以及防抖。 sendPushes 則負責具體的推送。
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
go s.WorkloadEntryController.Run(stopCh)
go s.handleUpdates(stopCh)
go s.periodicRefreshMetrics(stopCh)
go s.sendPushes(stopCh)
}
02
客戶端envoy連線及配置處理
在 envoy/service/discovery/v3/ads.pb.go 中定義了 RPC 介面
var _AggregatedDiscoveryService_serviceDesc = grpc.ServiceDesc{
Streams: []grpc.StreamDesc{
{
StreamName: "StreamAggregatedResources",
Handler: _AggregatedDiscoveryService_StreamAggregatedResources_Handler,
ServerStreams: true,
ClientStreams: true,
},
{
StreamName: "DeltaAggregatedResources",
Handler: _AggregatedDiscoveryService_DeltaAggregatedResources_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
}
StreamAggregatedResources 接收 DiscoveryRequest 返回 DiscoveryResponse 流,包含全量的 xDS 資料
DeltaAggregatedResources 接收 DeltaDiscoveryRequest ,返回 DeltaDiscoveryResponse 流,包含增量的 xDS 資料
這裡我們以處理全量的 xDS 資料為例
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
return s.Stream(stream)
}
func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
//安全認證
ids, err := s.authenticate(ctx)
//建立connection連線
con := newConnection(peerAddr, stream)
//每個connection開一個協程接收放入connection channel中的req
go s.receive(con, ids)
for {
//迴圈處理channel中的req
select {
case req, ok := <-con.reqChan: // 這裡是envoy->istiod
if ok {
if err := s.processRequest(req, con); err != nil {
return err
}
} else {
// Remote side closed connection or error processing the request.
return <-con.errorChan
}
case pushEv := <-con.pushChannel: // 這裡是istiod->envoy
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return err
}
}
}
}
EnvoyXdsServer 啟動完後, 開始接收來自 configController 、 serviceController 的配置變化 以及 K8s 原生服務 事件, 當 服務資料的變化和配置資料的變化 時 ,都會建立 PushRequest 傳送至 EnvoyXdsServer 的 pushChannel 中。
放入 pushChannel 中的 PushRequest 後續會通過 handleUpdates 進一步的進行處理, handleUpdates 最重要的功能就是防抖,避免因過快的推送帶來的問題和壓力。
防抖處理的主要函式
// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {
push := func(req *model.PushRequest, debouncedEvents int) {
pushFn(req)
updateSent.Add(int64(debouncedEvents))
freeCh <- struct{}{}
}
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
//當事件的延遲時間大於等於最大延遲時間或靜默時間大於等於最小靜默時間,才會執行 push() 方法
if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
if req != nil {
pushCounter++
free = false
go push(req, debouncedEvents)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(opts.debounceAfter - quietTime)
}
}
for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
// If reason is not set, record it as an unknown reason
if len(r.Reason) == 0 {
r.Reason = []model.TriggerReason{model.UnknownTrigger}
}
if !opts.enableEDSDebounce && !r.Full {
// trigger push now, just for EDS
go func(req *model.PushRequest) {
pushFn(req)
updateSent.Inc()
}(r)
continue
}
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(opts.debounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
//合併連續發生的多個PushRequest
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}
經過防抖處理後會呼叫s.Push進行req的推送
func (s *DiscoveryServer) startPush(req *model.PushRequest) {
// Push config changes, iterating over connected envoys
req.Start = time.Now()
//對連線的所有envoy客戶端connection進行req推送
for _, p := range s.AllClients() {
s.pushQueue.Enqueue(p, req)
}
}
s.pushQueue中儲存了所有客戶端envoy的connection資訊以及PushRequest資訊,具體的PushQueue結構如下
type PushQueue struct {
cond *sync.Cond
// pending stores all connections in the queue. If the same connection is enqueued again,
// the PushRequest will be merged.
//pending儲存了所有代理gRPC連線的PushRequest ,如果相同連線的PushRequest再次入隊,將會被合併
pending map[*Connection]*model.PushRequest
// 所有的envoy連線connection
queue []*Connection
// processing stores all connections that have been Dequeue(), but not MarkDone().
// The value stored will be initially be nil, but may be populated if the connection is Enqueue().
// If model.PushRequest is not nil, it will be Enqueued again once MarkDone has been called.
processing map[*Connection]*model.PushRequest
shuttingDown bool
}
到這裡就把叢集中監聽到的 Istio CRD 配置事件以及 K8s 的服務事件都入隊到 PushQueue 結構中,後面會呼叫 doSendPushes 從 PushQueue 結構中獲取每個 connection 及 req 資訊進行配置和服務的推送。
03
配置與服務的推送
通過 doSendPushes 從 pushQueue 中通過 Dequeue() 方法獲取 每個 客戶端 connection 和對應的 PushRequest ,再根據 PushRequest 生成 pushEv 傳入客戶端 connection 的 pushChannel 中。
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
//從pushQueue中通過Dequeue()方法獲取每個客戶端connection和對應的PushRequest
client, push, shuttingdown := queue.Dequeue()
if shuttingdown {
return
}
recordPushTriggers(push.Reason...)
// Signals that a push is done by reading from the semaphore, allowing another send on it.
doneFunc := func() {
queue.MarkDone(client)
<-semaphore
}
proxiesQueueTime.Record(time.Since(push.Start).Seconds())
var closed <-chan struct{}
if client.stream != nil {
closed = client.stream.Context().Done()
} else {
closed = client.deltaStream.Context().Done()
}
go func() {
//生成pushEv
pushEv := &Event{
pushRequest: push,
done: doneFunc,
}
select {
//把pushEv傳入每個客戶端的channel中
case client.pushChannel <- pushEv:
return
case <-closed: // grpc stream was closed
doneFunc()
log.Infof("Client closed connection %v", client.conID)
}
}()
}
}
}
在前面介紹 pilot/pkg/xds/ads.go 中 StreamAggregatedResources接受每個客戶端連線請求 時,有如下程式碼
case pushEv := <-con.pushChannel: // 這裡是istiod->envoy
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return err
}
這樣 把 pushEv 傳入每個客戶端的 channel 中 後,會呼叫此處的 pushEv := <-con.pushChanne l 獲取到 pushEv 並使用 s.pushConnection(con, pushEv) 進行配置的推送
// Compute and send the new configuration for a connection.
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
pushRequest := pushEv.pushRequest
//直接進行全量推送
if pushRequest.Full {
// Update Proxy with current information.
s.updateProxy(con.proxy, pushRequest)
}
if !s.ProxyNeedsPush(con.proxy, pushRequest) {
log.Debugf("Skipping push to %v, no updates required", con.conID)
if pushRequest.Full {
// Only report for full versions, incremental pushes do not have a new version.
reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil)
}
return nil
}
// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
wrl, ignoreEvents := con.pushDetails()
for _, w := range wrl {
if err := s.pushXds(con, w, pushRequest); err != nil {
return err
}
}
if pushRequest.Full {
// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.
reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents)
}
proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
return nil
}
04
總結
文中介紹了 EnvoyXdsServer 的結構以及 EnvoyXdsServer 的啟動流程、怎麼與 envoy 客戶端建立連線,當 Istio CRD 配置、 K8s 服務事件變化後,怎麼監控到事件並把相關配置傳到 EnvoyXdsServer 的 channel 中,如何進行防抖及推送,最後把事件傳到每個客戶端的 connection 中。
點選【閱讀原文】到達Istio網站。
CNCF概況(幻燈片)
掃描二維碼聯絡我們!
CNCF (Cloud Native Computing Foundation)成立於2015年12月,隸屬於Linux Foundation,是非營利性組織。
CNCF ( 雲原生計算基金會 )致力於培育和維護一個廠商中立的開源生態系統,來推廣雲原生技術。我們通過將最前沿的模式民主化,讓這些創新為大眾所用。請長按以下二維碼進行關注。