Kafka的Consumer Group Rebalance
什麼是Consumer Group Rebalance?
Kafka Consumer建立的時候都要指定一個組ID(group id),所有組ID一樣的Consumer就組成了一個Consumer Group。對於一個Partition同一時刻只會分配給同一個Group內某一個Consumer,這就是大家熟知的Kafka消費模型。通過這個模型,Kafka的消費者(也就是應用/服務)可以很方便的實現Load Balance、HA和水平擴充套件。簡單說這個模型就相當於現在有M個Partition,N個Consumer,然後把這M個Partition平均分配給N個Consumer,而且分配的時候有個限制條件:一個Partition只能分配給一個Consumer,Consumer Group Rebalance就是在需要的時候去做這個分配工作的,而且它的大原則是儘量保證Partition均衡的分配給組內各個Consumer。
那什麼時候需要,或者說什麼時候會發生Consumer Group Rebalance呢?看前面描述的職責就已經很明確了,當M或者N值發生變化的時候就需要Rebalance了,準確點就是:當Group內的Consumer個數或者Consumer訂閱的Partition個數發生變化的時候就需要Rebalance了。下面列舉一些常見的場景:
- Group內有新的Consumer加入,比如應用擴容了、修改了已有應用的並行度(N值發生變化)
- Group內Consumer個數減少了,比如應用縮容、掛掉、poll超時等(N值發生變化)
- 現有Consumer修改了訂閱的Topic,導致Group內的Partition個數變了(M值發生變化)
- 已訂閱的Topic修改了Partition的個數(M值發生變化)
- ...
上面這些場景有些是主動的,有些是被動的,有些是無法避免的,有些是可以避免的,有些是正常的,有些是程式碼bug導致的...總之,當發現資源(Partition)有變更,需要重新分配以消除“貧富差距”的時候,就會發生Consumer Group Rebalance了。但是資源的分配不論是在現實世界,還是在分散式的世界中,都是一個難題。下面介紹Kafka是怎麼做的。
Rebalance介紹
實質上,Rebalance是一個抽象、通用的資源分配協議,它不光可以用於Partition這種資源的分配,在Kafka中,有多個地方都有使用:
- Confluent Schema Registry:使用Rebalance協議來選主
- Kafka Connect:使用Rebalance協議來給connector分配任務
- Kafka Stream:使用Rebalance協議來給例項分配Partition和任務
網上關於這新老協議的細節講述已經非常多了,這裡就概括性的介紹一下。
Rebalance Protocol
如下圖,Rebalance協議由2部分組成,一部分在Broker端,一部分在Client端:
- Group Membership Protocol (Broker端):主要負責整體協調
- Client Embedded Protocol (Client端) :主要負責具體的資源分配
這裡注意一個細節就是一部分協議是在客戶端的,而且使用者可以按照約定好的協議進行自定義的實現,比如實現一個自己的資源分配方案,後面就會講到。
下面還是以本文討論的Consumer Group Rebalance的應用場景(即Partition資源的分配)來描述。對於每一個Consumer Group,都會有一個Coordinator節點(由某個Broker充當)負責這個Group的資源分配,也就是上面的Group Membership協議其實就是由這個Coordinator節點來實際運作的。假設現在新加入了一個Consumer,看下整個Rebalance過程的步驟:
- 該Consumer給Kafka叢集傳送
FindCoordinator
請求,找到它所屬的Group對應的Coordinator; - 找到後向Coordinator傳送
JoinGroup
請求。該請求會攜帶客戶端(即該Consumer)的一些使用者配置(比如session.timeout.ms
、max.poll.interval.ms
)和一些元資料(比如訂閱了哪些主題等)。 - 收到
JoinGroup
請求後,Coordinator通過心跳響應(Heartbeat
)響應通知組內其它成員要開始Rebalance了。然後其它Consumer像這個新加入的Consumer一樣,也傳送JoinGroup
請求給Coordinator。 - 當Coordinator收到組內所有成員
JoinGroup
請求以後,會給所有成員傳送一個JoinGroup
響應。其中給Group Leader(加入組的第一個成員)傳送的Response裡面包含了成員資訊、資源分配策略等元資料,其它成員則是一個空的Response。這個Leader拿到這些資訊以後,本地計算出分配結果。 - 所有成員向Coordinator傳送
SyncGroup
請求,Leader的請求中會包含自己計算的分配結果,其它成員則是空請求。 - Coordinator將Leader的分配結果通過
SyncGroup
響應傳送給各個成員。如果Consumer實現了ConsumerRebalanceListener
介面,則會呼叫該介面的onPartitionsAssignedMethod
方法。
至此,整個Rebalance過程就結束了,這裡再補充一些細節:
- 上面提到了一個心跳的概念:Consumer內部有一個心跳執行緒定時傳送心跳給Coordinator,以讓Coordinator知道自己還活著。當需要Rebalance的時候,Coordinator會在心跳響應中通知所有Consumer要進行重平衡了,這就是上面提到的通過心跳通知。
- 上面舉的例子是由一個新加入的Consumer觸發了Rebalance。很多其它行為也會觸發,前面已經列舉過常見的場景了。結合現在的流程和心跳知識再細化一下觸發場景,比如當有Consumer正常停止的時候,在結束之前會發送
LeaveGroup
請求給Coordinator;如果是異常停止,Coordinator會通過心跳超時來判斷Consumer已經沒了。當然實際中,可能Consumer其實正常著,只是因為網路原因心跳超時了,或者Consumer裡面沒有及時呼叫poll
方法等。 - 前面提到Rebalance協議分為Broker端的“Group Membership Protocol”部分和Client端的“Client Embedded Protocol”部分,上面Group Leader計算分配方案,就屬於“Client Embedded Protocol”部分的功能。提這個是因為Client這部分的協議除了預設的一些實現外,使用者可以去自定義實現,後面馬上要講到的改進方案Incremental Cooperative Rebalance其實就是在這裡實現的。
再放一個圖(圖片來自於引用文章 From Eager to Smarter in Apache Kafka Consumer Rebalances ,下同):
問題分析
優化之前肯定要先分析清楚現有的問題,才能有針對性的進行優化。其實從前面的介紹我們已經很清楚,Rebalance要做的事情很簡單:將M個資源(Partition/Task/Connector)平均分配給N個成員(Consumer/Instance/Worker),每個資源只能被一個成員擁有。事情本身不難,但難就難在需要在分散式環境中做這個分配工作。分散式環境中在任意時刻,網路可能分割槽、節點可能故障、還存在競態條件(race condition),簡單說就是分散式環境中無法實現可靠的通訊,這讓整個問題複雜化了。
前面介紹了現在的Rebalance開始的時候回收(revoke)所有成員的資源,然後大家一起參與Rebalance過程,等拿到新的資源分配方案,又重新開始工作。具體應用到Partition的分配,就是所有Consumer在傳送 JoinGroup
請求前需要停止從Partition消費,“上交”自己擁有的Partition。這樣當Coordinator收到所有Consumer的 JoinGroup
請求的時候,所有的Partition就處於未分配狀態,此時整個系統達到了一個同步狀態(Synchronization barrier):
所以,在重新分配之前,先回收所有資源其實是為了在不可靠的分散式環境中簡化分配工作。然而,按現在這種方式,在分割槽被回收到收到新的分配方案之前,所有成員都無法工作,即“Stop The World”(借鑑了GC裡面的概念),這也是Rebalance存在的最大的問題。預設Rebalance流程的超時時間為5分鐘,也就是最差情況下,“Stop The World”效果可能持續5分鐘。所以需要針對這個問題進行優化,思路也有兩種:
- 儘量減少Rebalance的發生
- 減少Rebalance中“Stop The World”的影響
社群在2.3版本中同時引入了兩個優化方案: KIP-345: Static Membership 和 KIP-429: Kafka Consumer Incremental Rebalance Protocol 分別按照上述兩種思路進行優化,下面分別介紹。
改進點1:Static Membership
Static Membership主要的優化目標是減少“閃斷”場景導致的Rebalance,即解決的思路主要是儘量減少Rebalance的發生,我們看下是如何優化的。
在每次Rebalance的時候,Coordinator會隨機給每個成員分配一個唯一的ID。然後當有新成員加入的時候,它的ID會是一個空字串 UNKNOWN_MEMBER_ID
,這樣Coordinator就知道它是新加入的,需要進行Rebalance了。Static Membership方案是給Consumer增加了 group.instance.id
選項,由使用者負責設定以及保證唯一性,這個ID會替換原來由Coordinator每次Rebalance隨機生成的ID(隨機生成稱之為“Dynamic Membership”),並且這個ID資訊會加到 JoinGroup
請求中。那這個ID有什麼用呢?
舉個例子:某一刻Consumer應用因為記憶體使用過高,被系統OOM Killer幹掉了,然後很快又被守護程序或者人為啟動起來的。這個時候,如果是以前的情況,Coordinator會認為是有新的Consumer加入,需要進行一輪Rebalance,但如果是Static Membership的情況下,Coordinator通過ID發現這個Consumer之前就有,就不會重新觸發整個Rebalance,而是將快取的之前分配給該Consumer的Partition直接返回給他,這樣就一定程度上避免了因為閃斷導致的Rebalance。
當然,這裡我用了“閃斷”,主要是想表達意外掛掉又很快恢復的情況,更具體點:
- 意外掛掉 :指被kill、網路閃斷等不會主動(或者說沒有機會)給Coordinator傳送
LeaveGroup
請求的場景。因為如果主動給Coordinator傳送了LeaveGroup
請求的話,Coordinator會馬上開始一輪Rebalance。 - 很快恢復: 指在Coordinator檢測到Consumer掛掉之前,就恢復了。具體點說就是在
session.timeout.ms
或者max.poll.interval.ms
時間內就恢復了,否則Coordinator會認為Consumer掛了,開始Rebalance。這裡簡單提一下這兩個配置項。在0.10.0及之前的版本中,心跳是和poll在一個執行緒裡面的,只有session.timeout.ms
一個引數。後來進行了優化拆分( KIP-62: Allow consumer to send heartbeats from a background thread ),心跳是一個單獨的執行緒,poll是一個執行緒,session.timeout.ms
仍然是心跳的超時時間,而max.poll.interval.ms
則是poll執行緒的超時時間。不管哪一個超時,Coordinator都會認為Consumer掛了,需要Rebalance。
如果我們要使用Static Membership特性,需要給Consumer增加 group.instance.id
設定。同時儘量將上面提到的超時時間設定的長一些。但顯然弊端就是Consumer如果真的掛掉且無法恢復的話,Coordinator需要等較長一段時間才能發現,相當於犧牲了一定的可用性。果然沒有免費的蛋糕。
改進點2:Incremental Cooperative Rebalancing
不同於Static Membership,Incremental Cooperative Rebalancing的思路是儘量減少Rebalance中“Stop The World”的時間和範圍。那怎麼做的呢?有這麼幾個關鍵點:
- 所有成員還是會發送
JoinGroup
請求,但這次傳送的時候資源並不會被回收(即不會停止工作),大家只是將自己目前擁有的資源資訊加到元資料裡面,傳送給Coordinator。然後Coordinator把這些資訊傳送給Group Leader,Leader根據這些資訊計算新的分配方案,計算的時候在保證均衡的情況下儘量對現有狀態做最小改動(實際由實現的分配演算法決定,預設的StickyAssianor策略就是這種),換句話說最核心的就是看哪些資源變更了成員,那就需要從原擁有者那裡剔除這個資源,然後加到新的擁有者那裡。 - 然後Coordinator會將新的分配方案按照原有的方式通過
SyncGroup
響應傳送給各個成員。各個成員收到新的分配方案以後,會和自己的現狀做對比,如果沒有變化或者只是新增了資源,則不需要額外做什麼。但如果發現有資源被回收,則繼續Rebalance的流程,接下來的流程和老版本的協議幾乎一樣,也需要回收資源,併發送JoinGroup
請求,但這裡僅回收需要被回收的資源。比如某個ConsumerRebalance之前擁有1、3、5三個分割槽,Rebalance中重新計算的新方案裡面是1、3兩個分割槽,則只回收5。
可以看到Incremental Cooperative Rebalancing是將原有的Rebalance流程進行了細化(分成了多輪),延遲了資源回收的時間和範圍,改進後的Rebalance流程如下圖:
那如何使用Incremental Cooperative Rebalancing呢?通過配置項 partition.assignment.strategy
進行配置,可以配置多個,越靠前優先順序越高。前面提到了Rebalance協議分兩部分,這裡配置的其實就是客戶端“Client Embedded Protocol”的實現類。2.8版本中已經支援的有:
- org.apache.kafka.clients.consumer.RangeAssignor (預設值)
- org.apache.kafka.clients.consumer.RoundRobinAssignor
- org.apache.kafka.clients.consumer.StickyAssignor
- org.apache.kafka.clients.consumer.CooperativeStickyAssignor
我們也可以通過實現 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 介面來實現自定義的Assignor。如果想使用Incremental Cooperative Rebalancing,就配置最後一個CooperativeStickyAssignor即可。不同Assignor的細節本文就不展開了,另外規劃了一篇文章《 Kafka的消費者分割槽分配策略 》。更多關於Incremental Cooperative Rebalancing的細節,可以參考本文引用部分的文章:
- Incremental Cooperative Rebalancing in Apache Kafka: Why Stop the World When You Can Change It?
- From Eager to Smarter in Apache Kafka Consumer Rebalances
總結
Kafka中的Rebalance本質上是解決分散式環境中資源分配的一種通用協議,由於分散式環境的複雜性,無法實現一個完美的方案,只能根據具體的場景進行有針對性的優化。比如實際中“閃斷”是引起Rebalance的一種很常見且無法避免的原因,所以就有針對性的增加了Static Membership方案。另外Rebalance很嚴重的一個問題就是會“Stop The World”,然而實際中Rebalance的時候其實往往只需要變更極少量的資源所屬權,所以就提出了Incremental Cooperative Rebalance方案,減少了Rebalance過程中“Stop The World”的時間和影響範圍。好的架構不是設計出來的,而是進化而來的,Kafka Rebalance優化的腳步仍在繼續。
另外,儘管現在已經做了諸多優化,效果也比較明顯,但Rebalance仍然算是一個代價比較大的操作,實際應用的時候,我們還是要能避免的就避免。
References:
- 整合學習介紹(4)——GBDT&XGBoost
- 整合學習介紹(3)——Random Forest
- 整合學習介紹(2)——AdaBoost
- 決策樹介紹
- 整合學習介紹(1)——Boosting && Bagging
- 如何修改Pandas中列的型別
- 從數倉到資料湖,再到Data LakeHouse
- Kafka的消費者分割槽分配策略
- Kafka的Consumer Group Rebalance
- Kafka的監聽地址配置
- Kafka的Producer
- Flink快速瞭解(6)——常用運算元(Operator)
- Filebeat原始碼淺析
- Filebeat導致檔案無法被刪除的原因分析
- Java內部類
- Kafka的儲存
- Kafka的擴容和縮容
- Skywalking流程簡析及原始碼除錯
- OpenTracing概念術語介紹
- OpenTracing Java Library教程(4)——Baggage介紹