kafka客戶端訊息傳送邏輯

語言: CN / TW / HK

【引言】


最近遇到了一個和kafka相關的問題,具體是在spark任務在一定並行度的情況下, 偶現個別executor因kafka訊息傳送超時導致失敗的情況。正所謂磨刀不誤砍柴工,為了能較好的定位問題,因此先對kafka客戶端訊息傳送相關邏輯的程式碼進行了走讀,本文就是對相關原理的一些總結。

【相關概念(資料結構)】


在客戶端裡,一些重要的概念或對應的資料結構包括:

1. ProducerRecord

生產者傳送的每條訊息,都對應一個ProduceRecord類例項物件,記錄了包括訊息的key,value,時間戳,header,topic,partition資訊。

2. ProducerBatch

客戶端傳送訊息時,並不是呼叫send介面傳送一條訊息,就實際將該訊息通過網路傳送出去,而是攢夠一批進行傳送。在具體實現中,ProducerBatch就對應這個批的概念。ProducerBatch本質上是一批訊息的集合,也就是上面ProduceRecord中的key、value、header經過序列化後的位元組資料儲存在ProducerBatch中。

3. RecordAccumlator

RecordAccumlator是上面ProducerBatch的集合。由於訊息只能發往topic的某一個分割槽,發往同一個topic的一個或多個訊息組成ProducerBatch,多個ProducerBatch組成一個連結串列,在RecordAccumlator內部,則以topic的分割槽為key,ProducerBatch連結串列為value,快取所有待發送的訊息。

一個簡單的示例如下圖所示:

 

4. BufferPool

一塊大的記憶體池,儲存訊息記錄序列化後的位元組資料,即ProducerBatch中用於存放具體訊息內容的記憶體就是從BufferPool中申請的。

在BufferPool內部,分為兩種型別的記憶體,一種是固定大小的記憶體,這些記憶體先從系統申請,使用完(訊息傳送完畢並確認收到)後,回收後放到記憶體池中,以便後續使用;另一種是不固定大小的記憶體,通常是大於一個批大小的記憶體,這些記憶體也是直接從系統申請,但使用完成後,不會放到記憶體池,而是等觸發垃圾回收時,被系統回收掉

5. Sender

專門的訊息傳送執行緒,定時從RecordAccumlator取出一批訊息併發送給服務端

6. NetworkClient

負責與所有broker通訊,包括與broker建立連線,協議上的互動(將訊息按指定協議格式傳送,定時更新元資料等等),以及處理broker的響應訊息。

如果從全域性的視角來看,kafka客戶端的架構可能是這樣的一個分層:

 

【訊息傳送流程】


從上面的介紹中,以及可以猜出大概的訊息處理流程。簡單概括客戶端訊息傳送的邏輯就是:業務執行緒(呼叫producer.send()的執行緒)將訊息序列化,並存放到ProducerBatch中,然後按需喚醒sender傳送執行緒;傳送執行緒從RecordAccumlator挑選出待發送的ProducerBatch列表,並按照指定協議格式構造請求,然後傳送給topic分割槽leader對應的broker,接著接收服務端的響應,並進行處理以及回撥通知

展開來說的話,流程如下圖所示:

還是分為兩部分,在業務執行緒中:

  • 呼叫send介面後,介面內部會對訊息(ProduceRecord)中key、value進行序列化,然後根據key選擇一個對應的分割槽,預設情況下是輪詢選擇

  • 然後根據選定好的分割槽,找到對應的ProducerBatch列表,並嘗試追加寫入到最後一個batch中,如果能成功追加,則直接返回該batch

  • 如果不能成功追加,則從BufferPool中申請一塊記憶體,如果訊息的大小超過一個batch指定的大小,則實際訊息大小來申請,如果不足一個batch的大小,則按一個batch大小來申請

  • 申請到記憶體後,構造一個ProducerBatch,並將訊息新增到其中,隨後將該batch新增到對應的ProducerBatch列表尾部(以保證同一分割槽訊息的順序),最後返回該batch

  • 外層對batch進行判斷,即該batch是否寫滿或是否為新建立的batch,如果是則喚醒傳送執行緒進行工作,如果不是就等待發送執行緒定時傳送(這就好比很多旅遊景點中接駁車的邏輯一樣,客流高峰期,滿了就走,平峰期準點才走)

在傳送執行緒中:

  • 在傳送時,先對所有ProducerBatch列表中的batch進行篩選,過濾掉沒有leader的分割槽,然後彙總分割槽leader的broker節點集合(首次與任一broker連線後,會自動獲取服務的元資料資訊,這就包括每個topic的每個分割槽,其leader所在的節點,因此可以根據分割槽篩選出對應需要傳送的broker集合)

  • 然後判斷這些broker節點是否準備好,例如連線是否建立,是否還可以繼續向其傳送訊息(可能之前持續傳送了很多訊息導致tcp視窗滿了)等,對於未準備好的節點先從集合中移除

  • 根據已經準備好的broker節點,挑選對應分割槽中ProducerBatch連結串列頭的batch,並從連結串列中移除,作為本次真正待發送的批資料

  • 接著過濾ProducerBatch中超時的batch,直接對這些batch進行回撥通知。

  • 然後才是將呼叫KafkaNetwork的介面,將批訊息按指定協議封裝傳送。

  • 最後通過IO事件回撥,處理服務端的響應(包括訊息的應答並逐層回撥處理,可能的連線斷開等等)

【有關的配置】


一些常用的,並且和上面流程或概念有關的引數包括:

1. buffer.memory

bufferPool的總大小,預設大小為32MB,每次分配後可用空間減少,當使用完回收後,可用空間又對應增加。如果單次申請的記憶體大於這個值,會直接拋異常;而如果BufferPool中剩餘可用空間的值不滿足條件時,則會阻塞執行緒,直到已有訊息傳送完成被釋放後,會通知該執行緒解除阻塞,重新分配。

2. batch.size

一個ProducerBatch的訊息,也是BufferPool中記憶體池裡每個記憶體塊的大小。預設大小為16KB。如果單條訊息的大小大於這個值,則按實際大小從BufferPool中申請;如果單條訊息的值小於這個值,則以該值為單位從BufferPool中申請。另外,當有新的訊息寫入時,如果一個ProducerBatch還未寫滿,並且剩餘空間足夠儲存該訊息,那麼則會追加寫到該ProducerBatch中。這也就意味著,一個ProducerBatch裡包含一條大於該值的訊息,或包含多條小於該值的訊息

3. request.timeout.ms

ProducerBatch的超時時間。每次往ProducerBatch追加寫時,會更新追加時間,如果ProducerBatch的最後更新時間距離當前時間超過了傳送超時時間,那麼則認為是傳送超時。並提示“ xxx ms has passed since last append”

4. linger.ms

前面訊息傳送流程裡提到了,單條訊息並不是立即傳送的,而是攢夠一批發送,那麼如果後續一直沒有訊息了,那是不是也就一直不傳送了呢?顯然不是這樣,一個ProducerBatch最長等待時間就是由linger.ms來決定的,sender執行緒在從ProducerBatch的表頭取出ProducerBatch時,會根據當前時間與ProducerBatch的最近一次傳送時間(如果沒有傳送則是ProducerBatch的建立時間)進行比較,如果小於linger.ms指定的時間,則不進入本次真正待發送的列表中,同時計算出剩餘時間,這其實就是後續poll輪詢與broker的連線,等待IO事件的時間。

另外,如果當前時間減去ProducerBatch的建立時間,大於傳送超時時間與linger.ms時間之和,那麼也會導致ProducerBatch的傳送超時。

【總結】


總結一下,通過本文的介紹,應該對kafka客戶端內部的整體設計、訊息儲存傳送流程有了個簡單的認識,遇到一些報錯時,也能從流程上進行初步的分析定位,至於深層次的問題,那就還需要再對原始碼深入分析,而本文開頭提到的問題,由於問題未復現,這裡也就沒有近一步分析說明,後續再結合問題對內部原理展開說明。

 

好了,這就是本文的全部內容,如果覺得本文對您有幫助,不要吝嗇點贊在看轉發,也歡迎加我微信交流~

本文分享自微信公眾號 - hncscwc(gh_383bc7486c1a)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。