KAFKA入門:【六】能否深入聊一下ISR的伸縮過程?

語言: CN / TW / HK

大家好,這是一個為了夢想而保持學習的部落格。這個專題會記錄我對於KAFKA的學習和實戰經驗,希望對大家有所幫助,目錄形式依舊為問答的方式,相當於是模擬面試。


一、前言

可能有些朋友發現之前空缺了第六節,是因為之前寫了沒來得及完善就一直擱置了,因此在這兒後續補上。


二、概述

我們瞭解ISR列表是不斷伸縮的,在副本失效後及時踢出ISR列表,在副本趕上進度之後重新將副本加入到ISR列表中,後面我們就會按照這個思路來看下其中細節。


三、什麼是失效副本?

功能失效:節點宕機,在該節點上的副本都屬於功能失效副本。
同步失效:follower副本所在的broker因為頻寬或者負載等因素無法及時完成同步,導致被踢出ISR。


四、ISR伸縮控制引數瞭解嗎?

在0.9x版本之前,有一個控制引數:replica.lag.max.messages 預設值為4000,表示如果follower的訊息個數落後leader個數4000,那麼就會被踢出ISR列表;
我們可以想一下這種直接指定條數的方式是否合理呢?顯然是不合理的,原因入下:
高吞吐的場景:瞬間就幾萬條訊息,可能follower就滯後個幾秒鐘就被判定為失效從而被踢出,可能導致ISR列表頻繁的變動,以及元資料的頻繁更新。
低吞吐的場景:可能一天就幾條訊息,那可能follower都滯後好幾天了依舊存在於ISR中,那ISR不就失去意義了嗎?

因此0.9x版本開始,移除了該引數,取而代之的引數是replica.lag.time.max.ms 該引數預設值是10000ms,也就是10s。
也就是說如果follower在10s都沒能追上leader的LEO,就會被認定為失效,從而踢出IS列表。


五、ISR是如何將失效副本剔除的?

我們知道了ISR是如何判定失效副本後,再來看下,到底是怎麼把這個失效的副本踢出去的呢?
1、每個broker在啟動的時候都會啟動兩個定時任務:

  • isr-expiration:定時檢查當前broker上的eader對應的副本失效資訊,也就是看當前Leader的ISR列表中是否存在失效副本,預設執行週期為replica.lag.time.max.ms / 2 = 5s
  • isr-change-propagation:定時檢查記憶體isrChangeSet中是否有新的變更資料,固定執行週期為2.5s

2、判斷副本失效:
isr-expiration任務會根據當前時間now,減去某follower的 lastCaughtUpTimeMs,如果大於replica.lag.time.max.ms值,則說明失效。
lastCaughtUpTimeMs這個值,在follower的LEO與leader的LEO相等時(Leader中維護了follower的LEO資訊),被更新。
也就是說,只有當follower完全追上了Leader才更新,而不是每Fetch一次就更新

關於為什麼不是每次Fetch的時候就更新該值呢?
我們試想一下,如果leader的寫入速率遠大於follower的同步速率,可能leader已經寫了10w條資料了,follower由於網路/負載為原因還在慢悠悠的同步,但是因為Fetch請求是正常傳送的,就每次都更新lastCaughtUpTimeMs值,從而認為該follower是有效的,那這不就導致leader和follower之間在這種場景下存在巨大的資料差異了嘛?從而影響資料的可靠性。

3、這個ISR變化的資訊如何傳遞呢?

  1. 由leader所在的broker的isr-expiration定時任務,去檢查失效副本和更新zk的/state節點資料,同時寫入isrChangeSet
  2. isr-change-propagation去檢查isrChangeSet是否有新增資料,如果有,則往zk中的/isr_change_notification節點下建立子節點。
  3. 而Controller對這個節點有一個Watcher,如果發現新增了子節點,那麼Controller就會重新從zk中獲取到最新的元資料,然後通知所有Broker更新元資料。

從上述過程中,我們還可以知道,實際上這個變更的資料會在記憶體中停留一段時間,假如這個時候我們對應的broker宕機了,那麼不就是改了zk卻沒有讓其他broker更新元資料嗎?
其實不是,因為這種情況下,broker宕機會觸發controller在zk下的brokers/ids下對應的節點被刪除,因此Controller也會讓其他的broker更新元資料,所以無論如何都會更新。

最後我們來總結一下整個ISR剔除的過程
每個leader在啟動的時候都會啟動兩個定時檢查任務,每隔一段時間檢查是否存在失效副本。
如果某個follower的lastCaughtUpTimeMs > 10s那麼就會被判定為失效副本
如果定時任務掃描到存在失效副本時,就會往zk的/state節點下更新最新的ISR列表資料,同時將變更資料寫入到記憶體中的isrChangeSet中。
然後另外一個傳播任務會定時檢查isrChangeSet是否存在需要變更的任務,如果感知到就往zk的/isr_change_notification節點下建立子節點。
最終由Controller感知到節點的變化,然後從zk中獲取最新的元資料,然後通知所有的Broker更新元資料,完成整個ISR列表的資料更新。


六、追趕上的副本是如何重新加入ISR中的?

在看完第五小節之後,第六小節就會顯得非常簡單,無非是需要知道什麼時候一個副本會重新判定為同步副本呢? 那就是:當前失效follower的LEO等於leaderHW的時候,即被判斷可以重新加入ISR。

那麼隨之而來的一個問題就是在哪兒去判斷followerLEO == leaderHW的呢?
這裡和上面的剔除ISR成員不一樣,並不是由定時任務去檢測的,而是在處理完Fetch請求的時候,如果判斷Fetch請求是follower發過來的的(replicaId >= 0),那麼就會去看下當前這個follower的LEO是多少(其實就是Fetch請求帶過來的),是不是趕上了當前的leaderHW,如果是的那麼就執行擴張ISR操作。
擴張ISR操作流程就和上面流程一樣了,先寫zk下的/state資料,然後寫isrChangeSet,最後由Controller感知到資料變化,更新叢集元資料。
我們所需要記住的主要差別點在於,ISR列表的擴張是在Fetch請求的時候去判斷和執行的。


七、整個伸縮過程總結

最後,我們用圖示來加深一點印象。
1、失效副本(圖源:《深入理解kafka》)

2、踢出ISR列表:

3、重回ISR列表:


八、效能優化

我們由上可知,ISR的伸縮是需要涉及到zk和Controller以及各個Broker的元資料更新的,因此如果太過頻繁會造成效能問題。
所以kafka在在判斷ISR伸縮之前,還會判斷兩個條件,以此來降低頻率:

  • 上次ISR集合發生變化距離現在已經超過5s。
  • 上一次寫入zk的時候,距離現在已經超過60s。

如果一個副本剛追上Leader加入到ISR,但是因為短時間內沒有追上LEO,5s之後又被檢查到是失效副本,不是又要被踢出去,要更新元資料,這樣就太頻繁了。 因此就有了上面兩個限制,就起碼給了多60s的讓新加入的follower去追上Leader的LEO。

最後,後續有時間我們結合這一小節具體分析一下原始碼,讓細節更加完善^_^

分享到: