Kafka 架構、核心機制和場景解讀

語言: CN / TW / HK

我們來自位元組跳動飛書商業應用研發部(Lark Business Applications),目前我們在北京、深圳、上海、武漢、杭州、成都、廣州、三亞都設立了辦公區域。我們關注的產品領域主要在企業經驗管理軟體上,包括飛書 OKR、飛書績效、飛書招聘、飛書人事等 HCM 領域系統,也包括飛書審批、OA、法務、財務、採購、差旅與報銷等系統。歡迎各位加入我們。

本文作者:飛書商業應用研發部 張博

歡迎大家關注飛書技術,每週定期更新飛書技術團隊技術乾貨內容,想看什麼內容,歡迎大家評論區留言~

摘要

Kafka 是一款非常優秀的開源訊息引擎,以訊息吞吐量高、可動態擴容、可持久化儲存、高可用的特性,以及完善的文件和社群支援成為目前最流行的訊息佇列中介軟體。

Kafka 的開發社群一直非常活躍,在訊息引擎的領域取的不俗成績之後,不斷拓展自己的領域,在基於事件的流處理平臺方向一直髮力,不斷自我更新迭代力圖成為這個領域內的事實標準。

Kafka 的訊息引擎功能十分強大,但是一直沒有停下自我突破的腳步,隨著 3.0 版本的中 KRaft 協議的推出,Zookeeper 的退出程序正式啟動,Kafka 開始了又一次的自我蛻變。

ZK 的移除是一個非常大膽的舉動,因為 ZK 在 Kafka 的叢集管理中居於核心的地位,不會輕易取代,那為什麼 Kafka 選擇了自行實現選舉機制的路線?

此外,雖然 Kafka 具備諸多優秀的特性,這些如今被視為最佳實踐的特性也是不斷演化而來的,從其不斷升級改進的過程中也能間接反映出生產環境所面臨的現實問題,那麼 Kafka 在實際的生產環境中的表現究竟如何?

作為業務方,使用 Kafka 作為訊息中介軟體進行業務開發,保證服務平穩執行需要避開哪些雷區?

這篇文件將從一個比較高的視角,從 Kafka 的設計理念、架構到實現層面進行深入解讀,隨著對 Kafka 相關機制的深入瞭解,這些問題的答案將浮出水面。

須知事項

  • 這篇文件基於 Kafka 最近剛剛釋出的 3.2 版本原始碼為基礎進行介紹,主要討論 Java 和 Scala 語言實現的原版客戶端和服務端,其他語言版本的客戶端與這篇文件介紹的機制在實現上會有較大出入,需要留意
  • 此外,位元組的業務很多使用的都是自研的 BMQ [3],在客戶端協議上是完全相容的,但是服務端進行了完全的重構,本文介紹的相關服務端機制並不適用
  • Kafka 整個專案包括 Core、Connect、Streams,只有 Core 這一部分是我們通常說的核心訊息引擎元件,另外兩個都是基於這個核心實現的上層應用,這篇文章主要介紹的就是 Kafka Core 相關的內容,下面的 「Kafka 的應用架構部分」會對這一點做簡要介紹

名詞對照

下面的表格給出了 Kafka 中出現的一些高頻和重要概念的對照解釋

| 英文名 | 中文名 | 解釋 | 備註 | | --------------------- | ---------- | ------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------- | | KIP | Kafka 改進提案 | KIP(Kafka Improvement Proposal)是針對 Kafka 的一些重大功能變更的提案,通常包括改進動機、提議的改進內容、介面變更等內容 | | | Partition | 分割槽 | 一個獨立不可再分割的訊息佇列,分割槽中會有多個副本儲存訊息,他們的狀態應該是一致的 | Kafka 分割槽副本的同步機制不是純非同步的,有高水位機制去跟蹤從副本的同步進度,並且有對應的領導者副本選舉機制保證分割槽整體對外可見的訊息都是已提交的 | | Replica | 副本 | 分割槽中訊息的物理儲存單元,通常對應磁碟上的一個日誌目錄,目錄中會對訊息檔案進一步進行分段儲存 | | | Leader Replica | 主副本、領導者副本 | 指一個 Partition 的多個副本中,對外提供讀寫服務的那個副本 | Kafka 叢集範圍有對等地位的元件是 Controller | | Consumer | 消費者 | Kafka 客戶端消費側的一個角色,負責將 Broker 中的訊息拉取到客戶端本地進行處理,還可以使用 Kafka 提供的消費者組管理機制進行消費進度的跟蹤 | | | Consumer Group Leader | 消費者組領導者 | 通常指 Consumer Group 中負責生成分割槽分配方案的 Consumer | 這個概念非常容易和上面的 Leader Replica 混淆 | | Log start offset | 訊息起始偏移 | Log start offset,Kafka 分割槽訊息可見性的起點 | 此位置對應一條訊息,對外可見可以消費 | | LSO | 上次穩定偏移 | Last stable offset,和 Kafka 事務相關的一個偏移 | 當消費端的引數isolation.level 設定為“read_committed"的時候,那麼消費者就會忽略事務未提交的訊息,既只能消費到LSO(LastStableOffset)的位置 | | LEO | 訊息終止偏移 | Log end offset,Kafka 分割槽訊息的終點 | LEO 是下一條訊息將要寫入的位置,對外不可見不可供消費 | | HW | 高水位 | High water mark,用於控制訊息可見性,HW 以下的訊息對外可見 | HW 的位置可能對應一條訊息,但是對外不可見不可以消費,HW 的最大值是 LEO | | LW | 低水位 | Low water mark,用於控制訊息可見性,LW 及以上的訊息對外可見 | 一般情況下和 Log start offset 可以等價替換,程式碼裡也是這個邏輯 | | ISR | 已同步副本 | In sync replica 指滿足副本同步要求的副本集合,包括領導者副本 | 副本同步是按照時間差進行判定的,而非訊息偏移的延遲 |

Kafka 的應用生態

下面這張是我根據 Confluent 部落格的一張資料圖重繪的 Kafka 應用生態架構圖,在正式開始介紹本文的主題之前,我們先了解一下 Kafka 的整個應用生態

這張圖中居於核心地位的是 Kafka Core 的叢集,也是我們常用的訊息引擎的這部分功能,是我們這篇文件重點介紹的物件

在核心的周圍,第一層是 Producer 和 Consumer 的基礎 API,提供基礎事件流訊息的推送和消費

而基於這些基礎 API Kafka 提供了更加高階的 Connect API,能夠實現 Kafka 和其他資料系統的連線,比如訊息資料自動推送到 MySQL 資料庫或者將 RPC 請求轉換為事件流訊息進行推送

此外,Kafka 基於自己的訊息引擎打造了一個流式計算平臺 Streams,提供流式的計算和儲存一體化的服務

Kafka Core 架構

Kafka Core 架構部分的解讀從模型、角色和實體、典型架構三個方向層層遞進進行介紹

訊息模型

Kafka 的訊息模型主要由生產消費模型、角色和實體,以及實體關係構成,前者表示了訊息的生產消費模式,後者描述了為了實現前者,其內部角色和實體存在怎樣的邏輯關係

基本訊息生產消費模型如下圖所示:

流程圖 (3).jpg

圖中展示了一個非常基本的生產消費場景,生產端向佇列尾部發送訊息,消費端從佇列頭部開始消費

從左往右看分別是消費端、訊息佇列、生產端,這三塊我們分開進行詳細介紹

消費端

在消費端有眾多消費者,它們之間用消費者組關聯起來

注意圖中 Consumer 0 是沒有分配到分割槽進行消費的,因為消費者組主要起個負載均衡的作用,一個分割槽被兩個消費者消費從業務視角來看就是在重複消費了

對已經分配到分割槽的消費者來說,消費從佇列的頭部開始,在 HW 前結束

訊息佇列

訊息佇列處於整個訊息模型中心的地位,是連線生產端和消費端的樞紐,Kafka 在效能優化上做的工作最多的就是這一個部分

因為 Kafka 的訊息儲存是佇列的資料結構,只允許訊息追加寫入,這樣的設計能最大化利用現有持久化儲存介質的寫入效能(SSD 和 HDD 都存在順序寫入效能遠大於隨機寫入的特性),實現訊息佇列的高吞吐量

此外,Kafka 的佇列還設計了高水位機制,避免未被從副本完成同步的訊息被消費者感知並消費

生產端

生產端的 Producer 持續傳送訊息到佇列中,訊息追加到佇列尾部,通過指定分割槽演算法可以決定訊息發往 Topic 下的哪個分割槽

小結

Kafka 的整個訊息模型還是基於經典的訊息模型去設計和改進的,訊息模型的設計還是非常簡潔易懂的,它的創新和優勢就是在於將這一套模型用分散式的多機模式實現出來,能支撐住大併發、高吞吐量場景下的低時延、高可用的業務需求

當然這套模型之下,還有一些比較小的話題值得去討論,我這裡選了兩個話題展開敘述來結束這一節

Push vs Pull

在 Kafka 定義的訊息模型中,消費端是通過主動拉取訊息的方式來消費的,但是與之對應的還有訊息推送模型,Broker 對生產者推送過來的訊息進行主動分發和推送到消費端

直覺上我們會覺得這種方式很自然,或者認為這是訊息引擎的唯一正規化,但是實際上關於為什麼選擇 Pull 的方式來進行消費,Kafka 的官方文件中關於這部分設計有專門列出來,主要討論的點是訊息消費的流控策略應該放在 Broker 端還是 Consumer 端,感興趣的可以去閱讀一下 Apache Kafka Documentation

零拷貝(Zero-Copy)

零拷貝從廣義的角度來看不是一種具體的技術實現(僅指作業系統實現的零拷貝機制),而是一種優化思想或者技巧,針對程式執行中不可變的資料或者不可變的部分儘量減少或者取消記憶體資料的拷貝,用記憶體地址去引用這些資料

Kafka 的訊息佇列的核心功能就是進行各種資料的 IO 和轉發(IO 密集型應用),零拷貝帶來的收益非常明顯:

  • 減少了 JVM 堆記憶體佔用,降低了 GC 導致的服務暫停和 OOM 風險
  • 減少了大批量頻繁記憶體拷貝的時間,能大幅優化資料吞吐效能

所以很有必要進行這樣的優化

Kafka 的例項是執行在 JVM 裡的,零拷貝的技術落地也離不開 Java 執行時提供的環境,具體到實現上主要依賴 Java 提供的 FileChannel 去對映檔案

針對訊息拉取消費的場景,直接將日誌段 FileChannel 中對應偏移和長度(Kafka 的日誌段都有對應的索引檔案,所以不需要讀取原始訊息日誌段檔案就能拿到這些資訊)的資料傳送到網路棧,規避應用層的資料拷貝中轉

針對訊息推送生產的場景,從網路棧讀取出來處理好的訊息直接從記憶體 Buffer 中向 FileChannel 寫入追加,當然這個場景並沒有實現嚴格意義上的零拷貝(JVM 堆記憶體存在於使用者空間,寫入檔案中必須要拷貝到核心),只不過 Kafka 用了 MemoryRecords 這個類基於 Buffer 去管理記憶體中的訊息,規避了使用物件結構的方式管理可能存在的記憶體拷貝和資料序列化行為(這個優化的思路和 String 以及 StringBuilder 一致)

這裡只是以場景的例子提供一些分析零拷貝實現機制的視角(系統原生支援 + 處理邏輯層面優化),零拷貝單獨展開也是一個很大的話題,總體來講就是在各個環節儘可能減少記憶體拷貝的次數,提高資料讀寫效能

角色和實體

在 Kafka 對上述訊息模型的實現中,定義了一系列負責執行的角色和表達資料結構的實體,每個角色和實體都有其對應的責任邊界,這些角色和實體之間共同配合完成整個訊息引擎的運作

Kafka 中有這麼一些比較重要的角色和實體:

  • Broker 是一個獨立的 Kafka 服務端例項,是最大的實體範圍,其他角色的例項都通過物件成員的形式引用進來,自身不負責請求的處理邏輯
  • Controller 是整個 Kafka 叢集的管理者角色,任何叢集範圍內的狀態變更都需要通過 Controller 進行,在整個叢集中是個單點的服務,可以通過選舉協議進行故障轉移
  • Replica 是一個獨立的訊息佇列實體,負責訊息在物理層面上的儲存
  • Partition 是邏輯層面的“佇列”實體,實際上是一組 Replica 的集合
  • Topic 是 Partition 的實體集合
  • Producer 是訊息生產者角色,會發送訊息到對應主題的分割槽中,寫入到 LEO 的位置去
  • Consumer 是訊息的消費者角色,能消費到 Partition 對外可見的訊息
  • Consumer Group 是 Consumer 的集合實體,並對應一組管理機制用來協調 Consumer 的消費
  • Group Coordinator 是 Broker 中一個負責管理對應消費者組元資料的角色,比較重要且熟知的功能就是負責消費進度的管理

雖然這裡已經列舉了比較多的角色和實體定義,但是 Kafka 中定義的角色和實體遠不止列舉的這些,不過大部分都不是本文需要介紹的相關內容,就不在這裡一一列舉了

上面我們已經瞭解了 Kafka 訊息引擎部分的一些設計抽象層面的知識,下面從 Kafka 的實現角度深入介紹一下上面出現的一些角色和實體

Broker

這一節開始對 Broker 的簡介中的定義是一個 Kafka 服務端例項,如果進一步追問這個例項是什麼,在程式碼中如何體現的話,答案就是 KafkaServer,這是個繼承了 KafkaBroker 的實現類

服務端程序啟動的入口就在這裡,此外一些簡單的請求可以直接在 KafkaServer 中處理掉,比如一些讀取元資料相關的請求就不需要進入其他角色的邏輯中處理了,直接讀取資料組裝結構體返回即可

不過我們以架構視角去看 Kafka 的話不需要這麼具體,就抽象地把它看做服務端例項即可

Controller

Controller 是 Broker 中對 Kafka 叢集來說非常重要的一個角色,負責叢集範圍內的一些關鍵操作:

  1. 主題的新建和刪除
  2. 主題分割槽的新建、重新分配
  3. Broker 的加入、退出
  4. 觸發分割槽 Leader 選舉

每個 Broker 裡都有一個 Controller 例項,多個 Broker 的叢集同時最多隻有一個 Controller 可以對外提供叢集管理服務,Controller 可以在 Broker 之間進行故障轉移

Controller 承擔的責任在我們眼裡更像是叢集的 Leader,不過在 Kafka 的其他地方也出現了 Leader 這個角色,避免混淆還是先記住 Controller 也是叢集中的重要角色吧

Partition

流程圖 (4).jpg

Partition 是一個獨立的訊息佇列,從資料結構的角度看可以理解為一個用陣列實現的佇列,起點是 Log start offset,此偏移會隨著訊息過期時間等配置的影響,逐漸向右移動

HW 是已提交訊息的可見性的邊界,僅在此偏移之下的訊息對外是可見的(注意,不含 HW 本身),該偏移的移動和 Kafka 的副本同步機制緊密關聯,下面會專門介紹此機制

Log start offset 和 HW 共同配合,形成了已提交訊息的可見範圍,需要注意的是受 Broker 的訊息過期清理配置的影響,從副本的 Log start offset 的值通常小於等於領導者副本的 Log start offset,可見範圍同樣會因此縮減

LEO 是訊息佇列的終點,下一條訊息將在這個地方寫入,同時 HW 的最大值就是更新到這裡

LW 的作用不是很大,因為分割槽的 Leader 副本一旦初始化完成,其 Log start offset 的值更新機制就是 LW 的更新機制,兩者可以等價替換

上面說的這幾個偏移的管理主要和 Kafka 的副本管理機制相關,尤其是 HW 更新機制,因為訊息資料需要在多個副本之間同步,所以需要這樣的機制來管理資料同步的進度

Topic

一個 Topic 就是一組 Partition 的集合,效果相當於是給一組 Partition 做了個命名,唯一提供的實際功能應該就是增加集合中的 Partition 數量

值得注意的是,先前版本的 Kafka 中僅使用 Topic 名稱作為識別符號去區分不同的 Topic,但是新版本中加入了 UUID 去進行判斷 KIP-516 ,主要是為了解決刪除、新建重名的 Topic 場景下的一些問題

Producer

Producer 是無狀態的(不使用事務機制的情況下),和 Partition 之間是多對多的關係

Producer 可根據分割槽演算法自行決定一條訊息應該發往哪個分割槽,該機制會在下面的文章中進行簡要分析

Consumer

Consumer 是有狀態的(不使用消費者組靜態成員或者不使用無消費者組機制的情況下),這個狀態以 Consumer Group 為單位進行維護

和 Consumer 自身關係比較大的應該就是訊息消費偏移提交機制了,這個功能在服務端 0.9.0 版本之前的實現是用 ZK 來儲存的,但是後面版本中 Kafka 開始用內部主題來持久化訊息偏移了

Consumer Group

消費者組是 Kafka 中的一個重要實體了,因為消費者組不僅僅是一個消費者的集合,而是以 Group 為中心輻射出一組消費的的管理機制:

  • 分割槽分配方案,由消費者組選舉出的消費者 Leader 執行生成,Coordinator 負責分發
  • 消費者加入、退出機制,由 Coordinator 負責協調執行
  • 消費者組消費進度管理,由 Coordinator 負責持久化管理

小結

這一節只從叢集大的視角列舉了一些比較重要的角色和實體,在後面的介紹中會有更加細分的角色和實體的深入介紹

通過對各個角色和實體的概念和職責建立起清晰認知,對我們理解 Kafka 的叢集架構設計、機制原理、問題定位有很大的幫助

角色和實體關係

Kafka 中的角色和實體概念比較多,我這裡梳理了一下比較核心的這些角色和實體之間的對應關係,能更好地幫助理解這些概念

流程圖 (5).jpg

注意上面關係圖中,Controller 和其他物件之間的關係描述的是管理視角的,而非物件實體的具體包含關係

因為從物件實體的包含關係上說,Controller 和 Broker 之間是一對一的關係,但是這樣的關係描述沒有實際意義

叢集架構解析

一個具有代表性的 Kafka 叢集通常具備 1 個獨立的 ZK 叢集、3 個部署在不同節點的 Broker 例項,這裡我以一個這樣的典型叢集的為例來介紹 Kafka 的整體架構,叢集情況如下:

  • 3 Broker、多個 Consumer(屬於某個消費者組)、多個 Producer、1 AdminClient
  • 1 Topic、1 Partition(Leader 副本在 Broker 1 上)
  • 當前 Controller 位於 Broker 0 上
  • Consumer 所屬消費者組的 Coordinator 位於 Broker 0 中

架構圖如下所示:

流程圖 (6).jpg

下面我將結合上面的架構圖,從叢集管理、消費、生產這幾個大的視角來解讀一下

叢集管理

叢集管理是一個重要命題,因為 Kafka 叢集需要管理大規模的 Broker 例項、消費者、生產者還有主題分割槽的訊息日誌資料

ZK 事件監聽

叢集管理的工作主要是由 Controller 來完成的,而 Controller 又通過監聽 Zookeeper 節點的變動來進行監聽叢集異動事件

Controller 進行叢集管理需要儲存叢集元資料,監聽叢集狀態異動情況並進行處理,以及處理叢集中修改叢集元資料的請求,這些工作主要都是通過 Zookeeper 來實現的

當前示例叢集中是 Broker 0 的 Controller 正在負責管理,監聽 ZK 中的相關節點異動情況,而其他 Broker 中的 Controller 處於備用狀態,監聽 /controller 節點準備下一輪選舉

ZK 目錄結構

我梳理了一下 Kafka 在 Zookeeper 中的目錄結構,因為沒有實測 Kafka 的所有叢集功能,所以末級節點可能不完整有缺失,但是重要比較核心的 ZNode 我都覆蓋到位了

梳理出的 ZNode 樹狀結構圖如下:

image.png

除了節點的名稱,節點中還有 Kafka 序列化的 JSON 資料,部分節點的資料結構如下:

  • Partition 相關節點的值

/brokers/topics/test-topic/partitions/1/state

JSON { "controller_epoch":1, "leader":0,// leader 副本所在的 broker id "version":1,// 程式碼裡硬編碼的一個值,始終是 1 "leader_epoch":0, "isr":[// 已同步副本集合,Leader 副本包含在內 0, 1, 2 ] }

  • Controller 相關節點的值

/controller 是個臨時節點,Session 超時過期等原因會導致此節點被刪除

JSON { "version":1,// 程式碼裡硬編碼的一個值,始終是 1 "brokerid":0,// 當前是 Controller 的 Borker ID "timestamp":"1649930940915" }

/controller_epoch 是個永久節點,資料會持久化儲存

JSON 1 // 值就是個遞增的數字,表示選舉週期

  • Topic Config 相關的值

此節點用於儲存 Topic 級別的動態配置

/config/topics/test-topic

JSON { "version":1,// 程式碼裡硬編碼的一個值,始終是 1 "config":{ "min.insync.replicas":"2" // topic 級別的配置 } }

叢集管理請求轉發

如上面的架構圖所示,Broker 2 收到了 AdminClient 傳送過來的 CreateTopicRequest 請求,並沒有進行處理,而是轉發到了 Controller

碰到這類叢集管理的請求,Broker 都先對自身狀態進行判定,不是 Controller 的情況下會對滿足要求的請求進行轉發

通常是對於叢集狀態有修改的請求會進行轉發,對於讀取叢集狀態的請求則通過本地的元資料快取來處理

副本管理

上面介紹的典型叢集架構只有一個分割槽,三個副本,這裡拓展成三個分割槽,九個副本的叢集來簡要介紹一下 Kafka 副本管理的模式

流程圖 (7).jpg

前面的訊息模型和架構圖中裡我們已經瞭解了分割槽、副本的概念,這裡通過上面這張圖梳理一下分割槽和副本在叢集中的分佈關係

首先,同一個分割槽有多個副本,這裡設定的是 3 個,儘可能均勻分佈在 Broker 中

其次,分割槽副本里有一個 Leader Replica,也就是領導者副本,負責分割槽訊息資料的讀寫,所以領導者副本在 Broker 之間也需要均勻分佈,這樣才能保證負載均衡

結合上面的圖例,有幾個點需要注意:

  1. 副本自身是沒有專門的編號的,副本在哪個 Broker 上,對應的 Broker ID 就是它的編號(這裡也間接限制了副本數量的最大值必須小於 Broker 節點數量)
  2. 我這裡舉例用綠色表示了從副本,假定的是已同步的狀態,實際場景中會存在從副本未同步完成的情況

讀寫分離

讀寫分離是關於 Kafka 副本管理的一個熱點話題,Kafka 目前是支援消費從副本訊息資料的,KIP-392 的提案就是關於這個機制

但是從上面的圖中我們也可以看出,你讀取的從副本所在的 Broker 也是另一個分割槽的領導者副本所在的位置,大多數場景下使用這個功能只會導致熱點 Broker 的出現,並承擔資料同步延遲的代價,並不能達到我們減輕領導者副本負載的目的

這個提案改進這個小功能點主要是為了解決跨資料中心/機架部署的場景下,儘可能從本地資料中心消費資料,降低網路通訊的成本開銷,提高資料吞吐量

另外一點需要注意的是,Broker 的分割槽副本同步只能從領導者副本消費訊息進行拉取,無法從其他從副本獲取資料,支援讀寫分離的是客戶端消費者

消費

消費的流程我們只討論的是使用了 Kafka 提供的消費者組管理機制的消費者,對於手動管理消費進度的情況這裡不予討論

訊息消費的大體流程是:

  1. 連線到任意 Broker,獲取叢集元資料
  2. 通過上一步的元資料,找到自己所屬 Coordinator 所在的 Broker
  3. 加入消費者組,獲取分割槽消費方案
  4. 獲取相關分割槽消費進度,從上次消費的地方開始繼續拉取訊息,同時本地儲存消費進度
  5. 非同步提交分割槽本地消費進度到 Coordinator

上面的架構圖中藍紫色的箭頭展示了這樣的訊息消費的流程順序

採用這樣的流程去消費資料和 Kafka 的架構也是有密切聯絡的,因為消費的資料通過分割槽分佈在整個叢集的 Broker 中,所以需要獲取整個叢集的元資料瞭解自己需要獲取的分割槽資料所在位置

同樣在消費側因為用了消費者組去進行負載均衡和容災,所以消費者之間需要進行溝通、協調消費方案,但是消費者也是分散式執行的例項,所以需要 Broker 提供 Coordinator 這樣的中介在消費者之間架起溝通的橋樑

併發性

消費側的併發性需要考慮兩個問題:

  1. 訊息拉取到客戶端
  2. 訊息偏移的提交和獲取

前者支援併發,但是後者則不然

從程式碼上看,同一個消費者組的消費進度是沒法併發提交的,有加可重入鎖保護消費者組的元資料物件,每次寫入的時候都需要先獲取到鎖

Go // 針對消費者組元資料的很多操作都是在臨界區中完成的 group.inLock { ... }

更加反直覺的是,消費進度的讀取操作也是同樣的一把鎖保護,無法併發獲取,具體原因不詳,但是此鎖的作用可能是:

  1. 保護正在使用中的消費者組不被刪除
  2. 消費進度出現變動(偏移過期被刪除、分割槽擴容有新分割槽進度加入等),等待其他操作完成再執行

總的來講針對一個消費者組的幾乎所有操作都不支援併發(讀寫都是),主要目的可能就是為了保護正在使用的資源不被意外刪除

生產

訊息生產的大概流程是:

  1. 連線到任意 Broker,獲取叢集元資料
  2. 傳送訊息到指定的分割槽 Leader 副本所在的 Broker
  3. 其他 Broker 上的副本向 Leader 副本同步

上面的架構圖綠色箭頭展示了這個流程

在這個流程中訊息是通過叢集元資料的提示,發往對應分割槽 Leader 副本所在的 Broker 上的,注意這裡不允許訊息在 Broker 之間進行轉發

併發性

一句話總結:同一個 Topic 不同分割槽之間是支援併發寫入訊息的,同一個分割槽不支援併發寫入訊息

這很好理解,單個分割槽是臨界資源,需要用鎖來進行衝突檢測保證同一時間只有一批訊息在寫入避免出現訊息亂序或者寫入被覆蓋的情況

小結

這一節的架構解析選取了 Kafka 叢集中比較重要的幾個角色和主流程來進行講述,可見為了實現前面所描述的基本訊息模型,需要一系列的管理機制協調、進行資料同步,還有容災機制保障整個叢集的有效執行

在早期的 Kafka 版本中,對 ZK 形成了強依賴,客戶端都是通過直連 ZK 的方式去獲取叢集配置和更新自己的狀態,不過後面的版本中逐步進行了抽象層隔離和解耦,現在需要客戶端直接和 ZK 互動的地方已經沒有了,都是和 Broker 打交道

這樣的依賴解耦帶來了簡潔的介面抽象,降低了技術上的門檻,同時將部分職責從 ZK 轉移到 Kafka 還提升了服務的效能

另外,目前存在的所有需要通過 ZK 去幹預 Kafka 叢集行為的方法,都可以通過 Admin API 或者其他介面去進行干預,這種早期需要 Hack 的暴力干預方式已經被完全淘汰

總結

這一部分介紹的是 Kafka 的架構,按照通常的分析思路應該先用 CAP 理論對此係統做一個定性(CP AP CA?),然後再繼續展開介紹

但是我這裡並沒有急於給出這樣的定性“結論”,究其主要原因是我認為這樣的定性描述其實不夠精準,很容易使我們陷入一種定式思維去看待 Kafka 並使得我們忽視了隱藏在其內部的一些細節

既然這篇文章就是對 Kafka 內在架構和機制的拆解和解讀,一些細節不可忽視,所以我們繼續深入探究一下 Kafka 再去討論這個問題就能形成一個比較完整的看法了

核心機制

整個 Kafka Core 中權重最大、使用頻率最高的三個角色是 Broker、Producer 和 Consumer,這幾個角色的使用和我們的業務開發也是息息相關,對這些角色的核心機制進行深入瞭解對後續的業務開發、故障排查是有很大幫助

Broker

Broker 端是 Kafka 整個內部處理流程最複雜的元件了,這當中的機制沒有辦法一個一個列舉出來詳細說,我這裡選擇了 Controller 和 Broker 管理機制,還有副本管理中的高水位機制來進行介紹

之所以選擇這幾個機制進行解讀,是因為他們對幫助理解叢集故障轉移過程中的行為、影響面有很大幫助,其他機制都是圍繞著這些核心的一些外圍機制,是一些輔助角色

Controller 選舉

我下面畫了一張叢集故障轉移的圖,描述的是 Controller 因網路、硬體故障等原因下線,整個叢集重新選舉 Controller 的過程

流程圖 (8).jpg

如圖所示,整個 Controller 選舉的過程分四個階段進行:

  • 階段 1:因為 Controller 和 Zookeeper 之間的會話因為超時、網路連線斷開等原因失效,導致臨時節點 /controller 被刪除
  • 階段 2:Broker 1 和 Broker 2 監聽到了 /controller 刪除的事件,觸發了 Controller 的重新選舉
  • 階段 3:Broker 1 成功建立 /controller 節點並寫入資料,Broker 2 檢測到了新寫入的 /controller 資料中止選舉
  • 階段 4:Broker 1 作為 Controller 初始化完成,向叢集中的其他節點發送更新叢集元資料的請求,同步最新的資料

這個過程稱之為「選舉」其實有些不合適,因為這裡其實基於鎖的一種選主機制,先搶到鎖的獲得資源使用權,因為後面 Kafka 推出了基於 KRaft 選舉協議的 Controller,所以這裡想做一些特別說明

注意,Controller 的選舉之後往往伴隨著 Broker 的下線,因為 Controller 的重新選舉一般就是 Broker 失效引起的,下一節會介紹這其中的相關機制

Broker 上線下線

線上的 Controller 通過監聽 /brokers 節點的異動情況處理 Broker 的上線、下線事件,這裡梳理了一下整個事件處理的流程

流程圖 (9).jpg

整個處理的流程還是比較清晰的,分支不多,值得注意的點有幾個:

  1. 異動資料是通過比對 Controller 中的元資料和 ZK 的資料差異計算出來的
  2. 這是個非同步處理流程,在 ControllerEventManager 中用佇列進行了解耦
  3. 針對 bouncedBroker 的處理方式是先移除,再新增
  4. KafkaController 中的 onBrokerStartup 方法執行了 Broker 上線後的存在 新增/離線 副本的分割槽進行領導者選舉

Broker 的異動在叢集中是一個非常重要的事件,因為其影響到了叢集整體的可用性:

  • Coordinator 需要轉移到其他 Broker 上,否則與之繫結的消費者組無法正常執行,且轉移期間消費者組無法正常消費
  • 分割槽副本,尤其是領導者副本需要在 Broker 中重新分佈,並且會觸發分割槽領導者副本選舉

上面兩點我認為是叢集 Broker 異動過程中比較核心的地方,因為 Controller 端處理完成 Broker 的元資料變更,後面的更新機制都是圍繞這兩個點進行

高水位更新

高水位是 Kafka 設計的一套用於跟蹤從副本非同步複製進度和保證資料一致性的機制

在架構部分簡要說了一下 Kafka 的副本管理中副本資料的分佈情況,這裡進一步介紹一下對一個分割槽來說,是如何通過高水位管理資料同步進度的

這裡我們用一個三副本的分割槽的場景來介紹該場景下高水位的值是如何更新到 4 的,如下圖所示:

流程圖 (10).jpg

注意:

  • 為了方便討論,這裡假設三個副本始終都在 ISR 中
  • 已寫入領導者副本的訊息在寫入時均滿足最小已同步副本要求

更新規則

在分析這個更新流程之前,我們先明確一下更新規則:

  1. 高水位的值就是遠端副本狀態中遠端 LEO 的最小值,注意這裡不判定 ISR 是否滿足最小已同步副本要求
  2. 從副本同步時拉取訊息的起始偏移,會被記錄為此副本在 ISR 中的遠端 LEO
  3. 從副本拉取訊息時,返回資料中包括當前最新的高水位值

整個高水位的更新流程都是基於上面這三條規則去執行的,這三條規則一起看能有點眼花繚亂,總結一下就是每次從副本發起訊息同步請求的時候幹兩件事:

  1. 上報自己的拉取訊息起點,領導者副本將其當做 LEO
  2. 獲取領導者副本的 HW 用於更新同步本地的 HW

流程解讀

現在來看下這三條規則是如何應用的,更新流程如下:

  • 階段1:副本 0 和 2 訊息都完全同步,僅副本 1 存在 2 條訊息的延遲,這時候副本 1 發出同步請求,遠端副本狀態中對應的遠端 LEO 更新為 4,本地 LEO 更新為 5
  • 階段2:因為遠端副本狀態中的遠端 LEO 發生變化,領導者副本的高水位更新為 4,隨後從副本 2 發出同步請求,獲取到了最新的高水位 4 並更新本地值,LEO 不發生變化
  • 階段3:從副本 1 繼續發出同步請求,遠端副本狀態的遠端 LEO 此時被更新為 5,請求返回後獲取到了最新的高水位 4 並更新本地值,同時遠端 LEO 的更新引起領導者副本 0 高水位的變化,更新為 5,隨後從副本 2 通過同步請求獲取到了變化後的值,高水位也隨之更新為 5

後續重複以上流程,最終所有副本的高水位和 LEO 都會更新到 5

小結

這部分我們介紹的是 Broker 端高可用、一致性方面的機制,其實服務端還要很多優秀機制的實現值得繼續深入挖掘和學習,比如主題分割槽副本的繫結機制、日誌檔案的管理等

文件後面的【學習資源】這一部分我提供了一些線索,對這方面感興趣的可以繼續深入去看

Producer

訊息傳送機制

目前生產端的訊息傳送是基於非同步傳送機制實現的,通過 RecordAccumulator 去做了解耦了訊息生產和網路請求

這裡需要先說明一下訊息生產請求的結構:

image.png

注意一下當前版本的 Kafka 只允許每個分割槽有一個批次的訊息,不允許一個請求傳送多個批次

下面這張場景流程圖描述了 RecordAccumulator 兩側各自的處理流程,使用者側呼叫 send 方法之後,訊息被追加到 RecordAccumulator,非同步執行緒輪詢,滿足條件之後呼叫網路客戶端的 send 方法向 Broker 傳送訊息生產請求

流程圖 (11).jpg

Java 版本的實現中,send() 返回的是一個 Future 物件,所以 send().get() 這樣的用法就能起到同步阻塞等待訊息傳送成功再返回的效果,但是本質上還是在非同步傳送訊息

訊息亂序問題

我們通常認為生產者傳送的訊息總是能夠保證分割槽有序,這是一種誤解,因為這裡有一個陷阱,就是 max.in.flight.requests.per.connection 這個客戶端網路配置

查閱 Kafka 的 官方文件 此配置的預設值是 5,表示一個連線中可以同時有 5 個訊息批次在途,文件中也明確指出了由於錯誤重試的關係,這種場景下訊息會亂序

所以,當我們業務上對訊息順序有硬性需求的時候,這個點必須引起重視

訊息分割槽機制

訊息分割槽機制可以認為是生產端的負載均衡機制,下面梳理了一張分割槽計算的流程圖,不同的分支對應不同的分割槽場景

需要注意的一點就是分割槽函式的入參不只是訊息的 Key,Topic、Value、Cluster(叢集元資料)都可以作為該函式的輸入資訊去計算分割槽

流程圖 (12).jpg

小結

這部分針對 Producer 的一些基礎功能進行了一些介紹,對了解 Producer 客戶端的執行已經足夠

另外針對生產側聊的比較多、比較深入的話題應該是分割槽訊息有序性、訊息冪等、精確一次語義等,這些問題單獨展開都是一個大話題,在這裡就不一一討論了

Consumer

成員管理

消費者組是 Kafka 消費端實現負載均衡、動態擴容、故障轉移的重要機制,此機制的執行和流轉需要 Broker 端的 Coordinator 和消費端的 Consumer 通過建立長連線進行互動和狀態流轉來完成此項工作

Coordinator 的定位

這裡插入一個小話題,那就是消費者怎麼知道自己的 Coordinator 在哪個 Broker 上,計算的過程非常簡明,就是根據消費者組名的 HashCode 對 __consumer_offset 主題的分割槽數進行取餘,程式碼如下:

Scala def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

計算出的分割槽領導者副本所在的 Broker 就是對應 Coordinator 的位置

注意,上述計算過程中所需的各種關於叢集的資訊,在獲取叢集元資料的階段都快取在了本地,這在本文的「架構-叢集架構解析-消費」這一部分已經介紹過了

消費者組狀態機

消費者組有一個狀態集合,整個消費者組就是在這幾個狀態之間流轉的,下面我用表格和狀態機圖例說明這些狀態怎麼流轉的,狀態列表如下:

| 狀態 | 前置狀態 | 備註 | | ------------------- | ------------------------------------------------------ | --------------------------------- | | Empty | PreparingRebalance | 此狀態同時也是初始狀態 | | PreparingRebalance | Empty, CompletingRebalance, Stable | | | CompletingRebalance | PreparingRebalance | | | Stable | CompletingRebalance | 轉移條件一般是 Coordinator 收到領導者發來的組同步請求 | | Dead | Empty, PreparingRebalance, CompletingRebalance, Stable | 通常是 Coordinator 出現轉移會導致組狀態變成 Dead |

狀態流轉圖如下:

流程圖 (13).jpg

上圖展示的是所有可能的狀態流轉路徑,對一個新建立的消費者組來說,符合預期的流轉路徑是 1 → 3 → 5,下一小節介紹重平衡機制的時候會詳細說明流轉過程

此外,這個狀態流轉圖中有一個危險的“死亡迴圈”,也就是 3 ⇆ 4 這兩條路徑組成的迴圈,下面介紹的重平衡機制與之相關

重平衡機制

重平衡機制是整個消費者組管理的重要機制,因為消費者組加入、退出、消費方案的分配這些核心功能基本都囊括其中

下面我將以一個新建立的消費者組為例,介紹一下重平衡機制,案例場景的情況如下:

  • Coordinator 位於 Broker 2 中,對應的消費者組成員只與其互動
  • 待加入的消費者組當前不存在
  • 三個消費者,且都能在規定的超時時間內入組併成功
  • Consumer 0 最先發起入組請求並被處理

整個重平衡分兩個大的階段進行,第一階段申請入組,主要是等待所有待加入消費者組的成員入組並分配成員 ID,第二階段組同步,主要是將消費者組領導者生成的分割槽消費方案向全組成員進行同步

重平衡的場景流程圖如下:

流程圖 (14).jpg

階段一按時序關係細分了幾個步驟:

  • 步驟一 Consumer 0 發起入組請求
  • 步驟二因為沒有成員 ID 入組請求被 Coordinator 拒絕並返回了一個有效的成員 ID
  • 步驟三 Consumer 0 帶入步驟二返回的成員 ID 再次入組併成功,Consumer 0 入組成功之後,其他成員陸續發起入組請求
  • 步驟四 Coordinator 直接賦予其領導者身份,因為是第一個入組成功的成員

這一階段整個消費者組狀態從 Empty → PreparingRebalance,觸發原因是步驟三有消費者申請入組成功(步驟一、二未觸發原因是沒有成員 ID 導致入組失敗)

流程圖 (15).jpg

階段二按照時序關係分這麼幾個步驟:

  • 步驟一入組等待時間結束,向所有消費者傳送入組成功結果
  • 步驟二所有消費者向 Coordinator 傳送組同步請求,領導者 Consumer 0 傳送的同步請求中攜帶了基於入組成功結果計算的整個消費者組的分割槽消費方案
  • 因為步驟二收到了消費者組的分割槽消費方案,所以步驟三 Coordinator 向組成員廣播了這個方案

這一階段消費者組狀態從 PreparingRebalance → CompletingRebalance → Stable,觸發原因分別是:

  1. 入組等待時間結束
  2. 領導者發起了組同步請求

除了新建的消費者組之外,已有的消費者組因為很多事件也會觸發重平衡機制,而且整個平衡的過程和這裡的案例會有所區別

這裡舉了個例子只是為了幫助讀者對整個重平衡過程有個大體的印象,瞭解整個過程中發生的主要流程,其他場景下的重平衡過程就不一一舉例鋪開敘述了

分割槽消費方案

分割槽消費方案的形成主要考慮兩個步驟:

  1. 消費方案的生成
  2. 消費方案的分發

步驟 2 在上一節已經介紹過了,這裡就說一下步驟 1

如果消費者入組成功之後被指定為了領導者,那麼後續它傳送的組同步請求中就帶入了已經生成好的分割槽分配方案

生成消費方案常見的策略是這兩個:

  • org.apache.kafka.clients.consumer.RangeAssignor
  • org.apache.kafka.clients.consumer.CooperativeStickyAssignor

值得注意的是在 Kafka 的架構上消費方案是由消費者負責生成的,主要原因我想除了效能考量之外,還有一個原因就是為了更方便消費者自定義消費策略

不均衡分配問題

我們通常說的分割槽分配策略預設是 RangeAssignor,該策略按照主題進行分配,儘量保證每個主題的分割槽在消費者之間儘量平均繫結

不過這種分配策略在實現上有些問題:

  1. 對消費者列表進行了排序
  2. 排序後的順序,按主題進行迴圈分配

因為對消費者做了排序再分配,會導致排序後的最後一個消費者總是分配到比其他消費者少的分割槽,造成不均衡的分配方案

流程圖 (16).jpg

每一個 Topic 的分配都是不均衡的,這個偏差會逐漸累積

其他的分配策略或多或少都有類似的問題,選擇業務上使用的分配策略時需要注意這一點

心跳保活機制

消費者加入消費者組之後,還需要保活機制維持其組成員的這個身份,保活主要通過兩條路徑來進行:

  1. 客戶端每次 poll 嘗試拉取訊息,Consumer 中執行在非同步執行緒的 ConsumerCoordinator 會判定兩次 poll 的時間間隔是否超出 max.poll.interval.ms 設定的值,超過則判定失效發起主動離組請求
  2. 非同步執行緒定時傳送心跳包,間隔超過session.timeout.ms 則服務端判定失效,強制剔除出消費者組

如果兩者之一失效消費者會被移出消費者組並觸發重平衡機制,整個過程和上面介紹的重平衡機制類似

要注意上面兩條路徑一個是客戶端本地判定,另一個是服務去判定的,第一條因為是客戶端的實現,有些語言的客戶端可能沒這個機制

小結

消費者組也是一個獨立的分散式服務叢集,執行著業務程式碼,靠 Kafka 進行分散式場景下的協調和資料持久化工作

正因為消費端也是一個分散式系統,所以分散式場景下的所有問題在這裡都同樣存在:生產端分割槽、消費端分割槽分配機制出現問題都可能引起資料傾斜;消費者數量上去總會有節點失效的情況出現,需要對應的災備機制進行處理;消費者之間依賴中心化的服務去協調排程,進行消費任務的分配,中心化服務的失效同樣會引起消費者組的故障

業務場景的挑戰

固然 Kafka 在設計之初對需要面臨的挑戰做了充分的設計和論證,但是面臨真實的使用場景它的表現究竟如何?

在我們平時的業務開發場景裡,使用到 Kafka 主要是做一個分散式微服務下的非同步事件的生產和消費,而且絕大部分業務不會有非常大的訊息吞吐量,在這些場景下 Kafka 的效能表現優異,對業務來說無法感知到效能瓶頸

但是在極端場景下,比如大資料分析的場景中,系統資料吞吐量很大,Kafka 叢集中的各個元件都在承受巨大壓力,任何一個單個元件的故障失效或者行為異常,都可能在叢集內大範圍擴散導致雪崩,引起系統性能的急劇下降,甚至是故障時效無法提供服務

至於可能造成這些不符合預期表現的原因,從上面介紹的架構、機制中我們可以找到這些問題的答案

ZK 的巨大壓力

在 1000+ Broker 的場景下,主題、分割槽、消費者組的數量巨大,需要在 Zookeeper 中儲存大量的資料,而且這麼多的節點在叢集執行的過程中會頻繁產生節點和資料的變動,觸發事件通知 ZK 客戶端

這裡有個案例 生產故障|Kafka訊息傳送延遲達到幾十秒的罪魁禍首竟然是... | HeapDump效能社群 分享的就是 ZK 對客戶端請求的處理延遲過高,心跳包無法及時處理引起 Controller 和 Broker 大面積掉線,訊息無法寫入

ZK 是強一致性的儲存系統,寫入效能不佳,面對如此高頻率的寫入請求自然是很難應付的過來,是約束叢集規模進一步擴張的重要條件

因此為了突破這一約束,進一步釋放 Kafka 作為高效能訊息引擎的潛力,在新發布的版本中自行實現了一套分散式一致性協議 KRaft 並支援 Controller 獨立部署

圖片來源:https://developer.confluent.io/learn/kraft/

目前 KRaft 版本的 Kafka 在生產環境上落地的案例很少,後續我會持續關注新機制給 Kafka 帶來的變化和效能提升

另外,這裡的舉例只是用了一個比較誇張的叢集規模,受限於硬體配置和軟體版本等原因,實際的叢集可能在幾十個 Broker 又或者 Topic 非常多的場景下就會出現 ZK 的效能瓶頸

不堪重負的 Controller

前面的叢集架構部分我們已經瞭解到,所有的 Broker 中都有一個 Controller 角色,但是同時只有一個對外提供服務,這裡討論一下這個叢集唯一的 Controller 的負載問題

同樣是考慮在 1000+ Broker 叢集的場景下,Controller 所在的 Broker 負載會比其他 Broker 大,因為要處理整個叢集範圍內所有叢集管理相關的請求,那麼這個 Broker 就很可能因為負載過大導致節點失效,引起 Controller 選舉和故障轉移

在小規模的叢集中這樣的故障轉移可以很快速,代價很小,但是在我們現在討論的場景中叢集元資料很多,同時伴隨著大量的主題和分割槽訊息資料,整個故障轉移的代價非常大

轉移過程中可能出現的一些異常情況:

  • Controller 選舉過程時間長,選舉期間無法執行新建主題、分割槽擴容等操作
  • Broker 之間進行分割槽副本資料的轉移,大量的檔案讀寫導致頁快取大規模失效,Broker 無法讀取到到頁快取,也加入到了頻繁的 IO 操作中進一步惡化 IO 效能
  • 沒有 Controller 導致叢集元資料無法及時更新,導致客戶端獲取到無效的資料,無法正常工作

Controller 在叢集中的地位非常重要,Kafka 及其類似的訊息系統都對這一個元件做了諸多重構和優化,形成了不同的解決方案:

  1. 可以將叢集中的幾個 Broker 獨立出來,提升硬體配置,專門負責 Controller 選舉
  2. BMQ [3] 對 Kafka 的這部分功能進行了重構

不穩定的消費者

在這裡我們考慮一下實際消費場景下的情況,假設有一個 100+ 消費者的消費組

前面我們已經介紹了一種場景下的重平衡機制,這裡需要討論關於重平衡對業務的影響,因為發起重平衡之後,消費者組就無法繼續消費資料了,必須要等到消費者組重新進入穩定狀態才可以繼續消費

理想情況下,消費者成功入組之後就能持續消費,穩定執行,但是實際場景中面臨如下挑戰:

  • 首次入組,因為不同消費者啟動速度有差異,導致 99 個消費者成功入組之後,最後一個消費者申請入組觸發重平衡(預設是等待 3s 進入 PrepareRebalancing)
  • 消費者消費過程中,因為資料傾斜部分消費者負載高,因 GC 等原因下線或心跳超時,觸發重平衡
  • 消費者組執行過程中,發現消費進度跟不上,故對消費者組擴容觸發重平衡

重平衡的代價很大,需要等所有消費者停止消費,然後開啟申請入組、組同步的這個流程,整個重平衡期間消費者組無法消費將加劇訊息消費的延遲

所以在這種消費者數量多的情況下,保證每個消費者能夠穩定執行非常重要,避免因 GC 或者網路抖動等內外因素觸發重平衡

雖然 Kafka 提供了消費者組這樣的機制去幫助實現消費端的負載均衡和彈性擴容,但是這種擴容也是有邊界的,消費叢集的規模也不是能夠無限擴張的,保證消費叢集的穩定性是個很大問題

針對消費場景的重平衡問題,比較常見的做法是繞過這套機制自行管理分割槽的消費,比如我接觸過的 Spark 和 Flink 大資料計算框架就是主要使用自行分配繫結分割槽消費,並且不使用 Kafka 提供的訊息偏移管理機制或僅作為輔助手段

業務上也可以參考這種方案去實現一套消費方案的管理機制,對出現故障的消費者予以告警和及時介入,隔離故障節點和對應的分割槽,不要影響其他分割槽的正常消費

至於 Kafka 提供的消費者組靜態成員的機制,這個業務案例不多,就不做介紹了

不可靠的程式碼

核心機制中介紹了生產者的訊息分割槽函式,這是生產端負載均衡的重要機制,最常見的無 Key 或者使用雜湊值計算分割槽的場景下,Key 總是能在分割槽中均勻分佈

實際業務場景中分割槽函式不一定按照我們預期的行為向 Broker 分發訊息,因為程式碼問題還是可能導致Key 的計算不符合預期,分割槽資料產生傾斜,引起部分 Broker 負載過高

因為在 Kafka Core 叢集的架構裡儲存和計算沒有分離,這種場景下因為儲存導致的壓力無法向其他 Broker 均攤,反而會連累整個 Broker 一起掛掉

此外,除了 Key 分割槽引起的資料傾斜之外,過大的訊息體也可能造成問題(比如把整個檔案當成訊息體傳送),如果因為程式碼錯誤向某個分割槽持續傳送比較大的訊息體造成資料傾斜(實際情況沒有這麼誇張,因為服務端對單批次的訊息最大值有限制,預設是 1048588 Bytes ≈ 1MB)

如果是把 Kafka 當成檔案系統來使用確實可能出現這個問題,因此大檔案的非同步消費最好是隻傳遞檔案的元資訊

故障轉移帶來的系統衝擊

前面幾個場景已經多次提到了故障轉移場景下的種種問題,這裡做個總結性質的陳述

故障轉移機制解決的是可用和自動化的問題,保證部分節點失效的情況下,系統整體是可用的,能夠繼續對外提供服務,而自動化能夠保證故障的第一時間有一個應對機制,降低對業務的影響,給後續的人工處理和介入爭取時間

因此在故障轉移的場景下,我們必須考慮因為負載在節點間的重新分配,導致的節點負載變化,優化整個轉移過程,提高整個服務的可用性,避免因為故障轉移導致服務的嚴重降級,影響使用者體驗,或者形成儘管服務還活著但是產品事實不可用的局面

故障轉移是從系統失衡進入到另一個穩態,這個轉移過程必定是對原系統有衝擊的,經過一段時間的重新平衡之後再回到穩定狀態

在自動化的機制之外,還需要建立業務上的響應機制,提前準備好災備方案,以便出現故障時能夠及時人工介入

故障轉移是一個非常值得深入討論的技術話題,這篇文件對這個問題無法進行太深入的探究,這裡就淺談幾句

總結

Kafka 誕生在雲原生的概念尚未出現的時代,從儲存底層開始一步一步構建出一套自成一體的分散式訊息系統,成為業界標準的訊息引擎

通過對 Kafka 架構和核心機制的深入瞭解,我們不難發現為了實現這個訊息引擎的高可用、高效能和強一致,可以說各方面都做到了極致優化,而且每個優化的環節都是彼此關聯,環環相扣,甚是巧妙

比如 Kafka 訊息索引的設計就是一個好的例子,不止是提升了通過訊息偏移查詢的速度,更是利用到這個資訊去實現了訊息消費階段的零拷貝,可以說是做到了一箭雙鵰,效能加倍的效果

另外,Kafka 是分散式系統的一種工程實現,不是純理論的理想模型,如果按照 CAP 理論對其進行生搬硬套一定是行不通的,這也是文章的前面我都沒有用這個理論先去解釋一番的原因

按照常見的理論解釋,Kafka 應該是 AP 系統,用訊息分割槽進行負載均衡提升效能,用副本機制保證 Broker 失效的場景下的可用性,副本之間的訊息是最終一致性的關係

很顯然,這樣的 AP 系統是無法在線上使用的(想想看 unclean.leader.election.enable 引數線上叢集能用否?),副本之間是純非同步複製,隨時會有未同步副本上線對外提供服務,沒有人願意承擔這種因為資料一致性問題帶來的訊息永久丟失風險

所以 Kafka 在設計和實現的時候就是在 CAP 三角中結合自己系統的應用場景做出了取捨,形成了有限的分割槽容錯(分割槽失效服務會短暫不可用)、整體可用(叢集可以部分失效)、弱一致性(高水位和副本選舉機制)的可能三角,各方面都有兼顧

你以為的 AP 系統,其實是……

也正是因為種種現實原因的限制,Kafka 的設計和實現上註定有不完美的地方,也有很多的歷史遺留問題,這些問題我們文章的前面也都有提到,這些問題或大或小,正是這些問題的存在進一步推動了 Kafka 自身的迭代進化,也催生出了其他的可能性

Pulsar 這樣的雲原生訊息系統就是在架構上對 Kafka 計算儲存一體的架構做了改進,儲存計算分離,充分利用了當今雲原生環境的帶來的擴充套件性優勢,成為當下熱門的專案

此外還有位元組自研的 BMQ 這樣的訊息中介軟體,同樣對 Kafka 架構還有內部核心機制的實現上做了重構和一些優化,去支撐自己的業務需求

Pulsar 的架構圖如下,可以明顯看到叢集架構中的儲存與計算分離的設計,負責儲存的 Bookie 是獨立的叢集,與訊息引擎解耦,BMQ 也有類似的設計,只不過用的元件不同

總的來說 Kafka 作為一個出道十幾年的專案,久經考驗,生命力頑強,確實是訊息系統的經典設計

學習資源

原始碼學習

入門指南

Kafka 是 Java 和 Scala 語言混合的專案,目前 IDEA 對這種專案的支援度最好,想要學習原始碼的話推薦用這個 IDE,省去很多解決環境問題的時間

開始閱讀服務端程式碼的話,推薦從兩個地方開始:

  • KafkaApis 這個類是各種外部請求 Handler 的入口,從這裡能看到 Kafka 各種介面的實現
  • KafkaServer 這個類是服務端 Broker 對應的實現類,從這裡作為入口能學習到整個服務端啟動的流程

閱讀客戶端程式碼的話,Producer 可以關注 RecordAccumulator 這個類,這是解耦本地訊息快取和網路傳送執行緒的樞紐

Consumer 應該關注 ConsumerCoordinator 的實現,關於消費者組管理客戶端的實現都在這裡,想了解重平衡等客戶端核心機制都需要在這裡找答案

程式碼導航

我把原始碼中出現的重要角色按照類名做了梳理,介紹了主要職責,供大家學習原始碼的時候參考

image.png

KIP

KIP 是 Kafka Improvement Proposal[5] 的縮寫,一些重大的功能變更都是用這樣的改進提案的方式發起的,這些提案對了解一些內部機制實現的動機和過程有非常大的幫助,屬於非常優質的第一手學習資料

這篇文章中引用了很多經典的 KIP,這裡梳理了一個列表出來方便後續對社群動態的跟進和原始碼的學習

| KIP 編號 | 提議內容 | 備註 | | ---------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------- | ----------------------------------------------- | | KIP-63 | 允許消費者從後臺執行緒傳送心跳包 | | | KIP-82 | 訊息新增頭部資訊(Headers) | 除了訊息體之外,允許新增鍵值對形式的頭部資訊,用於路由或者過濾等功能 | | KIP-106 | unclean.leader.election.enabled 配置的預設值從 true 變為 false | | | KIP-134 | 延遲首次初始化消費者組的重平衡操作 | | | KIP-392 | 允許消費者從最近的副本拉取資料 | 消費者的定義不含從副本向領導者副本的消費角色 | | KIP-500 | 移除對 Zookeeper 的依賴,基於 Raft 協議開發出一套新的選舉協議進行叢集元資料的管理 | | | KIP-516 | 主題唯一識別符號 | 閱讀 Motivation 的部分了解為什麼要新增這個識別符號即可,其他的技術細節不是特別重要了 | | KIP-555 | Kafka 管理工具中廢棄與 Zookeeper 的直連功能 | | | KIP-584 | 叢集功能版本控制 | 用於解決叢集滾動升級,以及不同版本客戶端相容的場景下的一些問題 | | KIP-630 | Kafka Raft 快照 | 屬於 KIP-500 的後續功能性 KIP | | KIP-631 | 基於選舉的 Kafka Controller | 屬於 KIP-500 的後續功能性 KIP |

其他

  • Kafka 官方文件[1] 是最權威的第一手資料,平時用來當手冊查詢各種配置的文件非常方便
  • BMQ 的設計文件[3] Motivation 的部分指出了 Kafka 作為大規模訊息佇列叢集的問題,整個設計文件就是對 Kafka 的再思考和重構,是從另一個角度審視 Kafka 的好材料
  • Why BMQ [4] 這篇文件針對不同實際運維場景,將 Kafka 和 BMQ 進行對比,指出了各自優缺點

參考文獻

  1. Kafka 官方文件
  2. Apache Kafka® Internal Architecture - The Fundamentals
  3. BMQ 設計文件
  4. Why BMQ 9.Kafka Improvement Proposal (KIP)

加入我們

掃碼發現職位 & 投遞簡歷:

image.png

官網投遞:job.toutiao.com/s/FyL7DRg