淺談kafka

語言: CN / TW / HK

作者:京東科技 徐擁

入門

1、什麼是kafka?

apache Kafka is a distributed streaming platform. What exactly dose that mean?





 

Apache Kafka 是訊息引擎系統,也是一個分散式流處理平臺(Distributed Streaming Platform)

2、kafka全景圖:





 

3、Kafka的版本演進:





 





 

4、kafka選型:

Apache Kafka:也稱社群版 Kafka。優勢在於迭代速度快,社群響應度高,使用它可以讓你有更高的把控度;缺陷在於僅提供基礎核心元件,缺失一些高階的特性。(如果你僅僅需要一個訊息引擎系統亦或是簡單的流處理應用場景,同時需要對系統有較大把控度,那麼我推薦你使用 Apache Kafka)

Confluent Kafka :Confluent 公司提供的 Kafka。優勢在於集成了很多高階特性且由 Kafka 原班人馬打造,質量上有保證;缺陷在於相關文件資料不全,普及率較低,沒有太多可供參考的範例。(如果你需要用到 Kafka 的一些高階特性,那麼推薦你使用 Confluent Kafka。)

CDH/HDP Kafka:大資料雲公司提供的 Kafka,內嵌 Apache Kafka。優勢在於操作簡單,節省運維成本;缺陷在於把控度低,演進速度較慢。(如果你需要快速地搭建訊息引擎系統,或者你需要搭建的是多框架構成的資料平臺且 Kafka 只是其中一個元件,那麼我推薦你使用這些大資料雲公司提供的 Kafka)

5、Kafka的基本概念:





 



6、Kafka的基本結構:





 

7、Kafka的叢集結構:





 

8、kafka的應用場景(使用者註冊/非同步):





 

9、kafka佇列模式---點對點:





 

10、kafka佇列模式---釋出/訂閱:





 

11、kafka構成角色:

1、broker:

訊息格式: 主題 - 分割槽 - 訊息 、主題下的每條訊息只會儲存在某一個分割槽中,而不會在多個分割槽中被儲存多份

這樣設計的原因是:不使用多topic做負載均衡,意義在於對業務遮蔽該邏輯。業務只需要對topic進行傳送,指定負載均衡策略即可 同時 topic分割槽是實現負載均衡以及高吞吐量的關鍵

Topic的建立流程





 



2、Producer:

傳送訊息流程





 

3、Consumer:

Kafka消費者物件訂閱主題並接收Kafka的訊息,然後驗證訊息並儲存結果。Kafka消費者消費者組的一部分。一個消費者組裡的消費者訂閱的是同一個主題,每個消費者接收主題一部分分割槽的訊息。消費者組的設計是對消費者進行的一個橫向伸縮,用於解決消費者消費資料的速度跟不上生產者生產資料的速度的問題,通過增加消費者,讓它們分擔負載,分別處理部分分割槽的訊息





 



4、Consumer Group:

它是kafka提供的具有可擴充套件且可容錯的消費者機制

特性:

1、 Consumer Group下可以有一個或多個 Consumer例項;

2、在一個Katka叢集中,Group ID標識唯一的一個Consumer Group;

3、 Consumer Group 下所有例項訂閱的主題的單個分割槽,只能分配給組內的 某個Consumer例項消費。

Consumer Group 兩大模型:

1、如果所有例項都屬於同一個Group,那麼它實現的是訊息佇列模型;

2、如果所有例項分別屬於不同的GrouD,那麼它實現的就是釋出/訂閱模型。

12、Kafka的工作流程:





 



13、Kafka常用命令:





 

進階

14、Kafka的檔案儲存機制—log:





 

15、Kafka的檔案儲存機制—分片/索引:





 

16、Kafka的檔案儲存機制—index/log:





 

17、kafka 如何支援百萬QPS?

順序讀寫 :

生產者寫入資料和消費者讀取資料都是順序讀寫的





 



Batch Data(資料批量處理):

當消費者(consumer)需要消費資料時,首先想到的是消費者需要一條,kafka傳送一條,消費者再要一條kafka再發送一條。但實際上 Kafka 不是這樣做的,Kafka 耍小聰明瞭。Kafka 把所有的訊息都存放在一個一個的檔案中,當消費者需要資料的時候 Kafka 直接把檔案傳送給消費者。比如說100萬條訊息放在一個檔案中可能是10M的資料量,如果消費者和Kafka之間網路良好,10MB大概1秒就能傳送完,既100萬TPS,Kafka每秒處理了10萬條訊息。

MMAP(記憶體對映檔案):

MMAP也就是記憶體對映檔案,在64位作業系統中一般可以表示 20G 的資料檔案,它的工作原理是直接利用作業系統的 Page 來實現檔案到實體記憶體的直接對映,完成對映之後對實體記憶體的操作會被同步到硬碟上。





 

通過MMAP技術程序可以像讀寫硬碟一樣讀寫記憶體(邏輯記憶體),不必關心記憶體的大小,因為有虛擬記憶體兜底。這種方式可以獲取很大的I/O提升,省去了使用者空間到核心空間複製的開銷。也有一個很明顯的缺陷,寫到MMAP中的資料並沒有被真正的寫到硬碟,作業系統會在程式主動呼叫 flush 的時候才把資料真正的寫到硬碟。

Zero Copy(零拷貝):

如果不使用零拷貝技術,消費者(consumer)從Kafka消費資料,Kafka從磁碟讀資料然後傳送到網路上去,資料一共發生了四次傳輸的過程。其中兩次是 DMA 的傳輸,另外兩次,則是通過 CPU 控制的傳輸。





 

第一次傳輸:從硬碟上將資料讀到作業系統核心的緩衝區裡,這個傳輸是通過 DMA 搬運的。

第二次傳輸:從核心緩衝區裡面的資料複製到分配的記憶體裡面,這個傳輸是通過 CPU 搬運的。

第三次傳輸:從分配的記憶體裡面再寫到作業系統的 Socket 的緩衝區裡面去,這個傳輸是由 CPU 搬運的。

第四次傳輸:從 Socket 的緩衝區裡面寫到網絡卡的緩衝區裡面去,這個傳輸是通過 DMA 搬運的。

實際上在kafka中只進行了兩次資料傳輸,如下圖:





 

第一次傳輸:通過 DMA從硬碟直接讀到作業系統核心的讀緩衝區裡面。

第二次傳輸:根據 Socket 的描述符資訊直接從讀緩衝區裡面寫入到網絡卡的緩衝區裡面。

我們可以看到同一份資料的傳輸次數從四次變成了兩次,並且沒有通過 CPU 來進行資料搬運,所有的資料都是通過 DMA 來進行傳輸的。沒有在記憶體層面去複製(Copy)資料,這個方法稱之為零拷貝(Zero-Copy)。

無論傳輸資料量的大小,傳輸同樣的資料使用了零拷貝能夠縮短 65%的時間,大幅度提升了機器傳輸資料的吞吐量,這也是Kafka能夠支援百萬TPS的一個重要原因

18、壓縮:

特性:

節省網路傳輸頻寬以及 Kafka Broker 端的磁碟佔用。

生產者配置 :

compression.type

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("acks", "all");

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 開啟GZIP壓縮

props.put("compression.type", "gzip");

Producerproducer = new KafkaProducer<>(props)

broker開啟壓縮:

Broker 端也有一個引數叫 compression.type 預設值為none,這意味著傳送的訊息是未壓縮的。否則,您指定支援的類​​型:gzip、snappy、lz4或zstd。 Producer 端壓縮、Broker 端保持、Consumer 端解壓縮。

broker何時壓縮:

情況一:Broker 端指定了和 Producer 端不同的壓縮演算法。(風險:可能會發生預料之外的壓縮 / 解壓縮操作,表現為 Broker 端 CPU 使用率飆升)

想象一個對話:

Producer 說:“我要使用 GZIP 進行壓縮。

Broker 說:“不要,我這邊接收的訊息必須使用配置的 lz4 進行壓縮

情況二: Broker 端發生了訊息格式轉換 (風險:涉及額外壓縮/解壓縮,且 Kafka 喪失 Zero Copy 特性)

Kafka 共有兩大類訊息格式,社群分別稱之為 V1 版本和 V2 版本

為了相容老版本的格式,Broker 端會對新版本訊息執行向老版本格式的轉換。這個過程中會涉及訊息的解壓縮和重新壓縮

訊息何時解壓縮:

Consumer:收到到壓縮過的訊息會解壓縮還原成之前的訊息。

broker:收到producer的訊息 壓縮演算法和自己的不一致/相容新老版本的訊息格式

壓縮演算法對比:





 

以 Kafka 為例,吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;壓縮比方面,zstd > LZ4 > GZIP > Snappy;

具體到物理資源,使用 Snappy 算法佔用的網路頻寬最多,zstd 最少,這是合理的,畢竟 zstd 就是要提供超高的壓縮比;

在 CPU 使用率方面,各個演算法表現得差不多,只是在壓縮時 Snappy 演算法使用的 CPU 較多一些,而在解壓縮時 GZIP 演算法則可能使用更多的 CPU;

19、Exactly-Once(ACK應答機制):

1、At Least Once

最少傳送一次,Ack級別為-1,保證資料不丟失

2、At Most Once

最多傳送一次,Ack級別為1,保證資料不重複

3、冪等性

保證producer傳送的資料在broker只持久化一條

4、Exactly Once(0.11版本)

At Least Once + 冪等性 = Exactly Once

要啟用冪等性,只需要將Producer的引數中 enable.idompotence設定為 true即可。 Kafka的冪等性實現其實就是將原來下游需要做的去重放在了資料上游。

20、producer如何獲取metadata:

1:在建立KafkaProducer例項時 第一步:生產者應用會在後臺建立並啟動一個名為Sender的執行緒,

2:該Sender執行緒開始執行時,首先會建立與Broker的連線。 第二步:此時不知道要連線哪個Broker,kafka會通過METADATA請求獲取叢集的元資料,連線所有的Broker。

3:Producer 通過 metadata.max.age.ms定期更新元資料,在連線多個broker的情況下,producer的InFlightsRequests中維護著每個broker的等待回覆訊息的佇列,等待數量越少說明broker處理速度越快,負載越小,就會發到哪個broker上

21、kafka真的會丟訊息嗎?

kafka最優配置:

Producer:

如果是Java客戶端 建議使用 producer.send(msg, callback) ,callback(回撥)它能準確地告訴你訊息是否真的提交成功了。

設定 acks = all。acks 是 Producer 的引數,如果設定成 all,需要所有副本 Broker 都要接收到訊息,該訊息才算是“已提交”。這是最高等級的“已提交”定義。

設定 retries 為一個較大的值。當出現網路的瞬時抖動時,訊息傳送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試訊息傳送,避免訊息丟失。

Consumer:

訊息消費完成再提交。Consumer 端有個引數 enable.auto.commit,最好把它設定成 false,並採用手動提交位移的方式。

broker :

設定 unclean.leader.election.enable = false。它控制的是哪些 Broker 有資格競選分割槽的 Leader。如果一個 Broker 落後原先的 Leader 太多,那麼它一旦成為新的 Leader,必然會造成訊息的丟失。故一般都要將該引數設定成 false,即不允許這種情況的發生。

設定 replication.factor >= 3,目前防止訊息丟失的主要機制就是冗餘。

設定 min.insync.replicas > 1,控制的是訊息至少要被寫入到多少個副本才算是“已提交”。設定成大於 1 可以提升訊息永續性。在實際環境中千萬不要使用預設值 1。 確保 replication.factor > min.insync.replicas。如果兩者相等,那麼只要有一個副本掛機,整個分割槽就無法正常工作了。我們不僅要改善訊息的永續性,防止資料丟失,還要在不降低可用性的基礎上完成。推薦設定成 replication.factor = min.insync.replicas + 1。

22、kafka Replica:





 

本質就是一個只能追加寫訊息的提交日誌。根據 Kafka 副本機制的定義,同一個分割槽下的所有副本儲存有相同的訊息序列,這些副本分散儲存在不同的 Broker 上,從而能夠對抗部分 Broker 宕機帶來的資料不可用

3個特性:

第一,在 Kafka 中,副本分成兩類:領導者副本(Leader Replica)和追隨者副本(Follower Replica)。每個分割槽在建立時都要選舉一個副本,稱為領導者副本,其餘的副本自動稱為追隨者副本。

第二,Kafka 的副本機制比其他分散式系統要更嚴格一些。在 Kafka 中,追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產者的讀寫請求。所有的請求都必須由領導者副本來處理,或者說,所有的讀寫請求都必須發往領導者副本所在的 Broker,由該 Broker 負責處理。追隨者副本不處理客戶端請求,它唯一的任務就是從領導者副本非同步拉取訊息,並寫入到自己的提交日誌中,從而實現與領導者副本的同步。

第三,當領導者副本掛掉了,或者說領導者副本所在的 Broker 宕機時,Kafka 依託於監控功能能夠實時感知到,並立即開啟新一輪的領導者選舉,從追隨者副本中選一個作為新的領導者。老 Leader 副本重啟回來後,只能作為追隨者副本加入到叢集中。

意義: 方便實現“Read-your-writes”

(1)含義:當使用生產者API向Kafka成功寫入訊息後,馬上使用訊息者API去讀取剛才生產的訊息。 (2)如果允許追隨者副本對外提供服務,由於副本同步是非同步的,就可能因為資料同步時間差,從而使客戶端看不到最新寫入的訊息。 B :方便實現單調讀(Monotonic Reads) (1)單調讀:對於一個消費者使用者而言,在多處訊息訊息時,他不會看到某條訊息一會存在,一會不存在。 (2)如果允許追隨者副本提供讀服務,由於訊息是非同步的,則多個追隨者副本的狀態可能不一致。若客戶端每次命中的副本不同,就可能出現一條訊息一會看到,一會看不到

23、ISR(In-Sync Replica Set)LEO&HW 機制:





 

HW(High Watermark)是所有副本中最小的LEO。

比如: 一個分割槽有3個副本,一個leader,2個follower。producer向leader寫了10條訊息,follower1從leader處拷貝了5條訊息,follower2從leader處拷貝了3條訊息,那麼leader副本的LEO就是10,HW=3;follower1副本的LEO就是5

HW作用:保證消費資料的一致性和副本資料的一致性 通過HW機制。leader處的HW要等所有follower LEO都越過了才會前移

ISR: 所有與leader副本保持一定程度同步的副本(包括leader副本在內)組成ISR(In-Sync Replicas)

1、Follower故障:

當follower掛掉之後,會被踢出ISR;

當follower恢復後,會讀取本地磁碟記錄的HW,然後截掉HW之後的部分,從HW開始從leader繼續同步資料,當該follower的LEO大於等於該partition的HW的時候,就是它追上leader的時候,會被重新加入到ISR中

2、Leader故障:

當leader故障之後,會從follower中選出新的leader,為保證多個副本之間的資料一致性,其餘的follower會將各自HW之後的部分截掉(新leader如果沒有那部分資料 follower就會截掉造成資料丟失),重新從leader開始同步資料,但是隻能保證副本之間的資料一致性,並不能保證資料不重複或丟失。



24、Consumer分割槽分配策略:





 

自定義分割槽策略:

你需要顯式地配置生產者端的引數partitioner.class。這個引數該怎麼設定呢?方法很簡單,在編寫生產者程式時,你可以編寫一個具體的類實現org.apache.kafka.clients.producer.Partitioner介面。這個介面也很簡單,只定義了兩個方法:partition()和close(),通常你只需要實現最重要的 partition 方法

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

//隨機

//return ThreadLocalRandom.current().nextInt(partitions.size());

//按訊息鍵保序策略

//return Math.abs(key.hashCode()) % partitions.size();

//指定條件

return partitions.stream().filter(Predicate(指定條件))).map(PartitionInfo::partition).findAny().get();

}

25、kafka中一個不為人知的topic:

consumer_offsets:

老版本的Kafka會把位移資訊儲存在Zk中 ,但zk不適用於高頻的寫操作,這令zk叢集效能嚴重下降,在新版本中將位移資料作為一條條普通的Kafka訊息,提交至內部主題(_consumer_offsets)中儲存,實現高永續性和高頻寫操作。

位移主題每條訊息內容格式:Group ID,主題名,分割槽號

當Kafka叢集中的第一個Consumer程式啟動時,Kafka會自動建立位移主題。也可以手動建立 分割槽數依賴於Broker端的offsets.topic.num.partitions的取值,預設為50 副本數依賴於Broker端的offsets.topic.replication.factor的取值,預設為3

思考:

只要 Consumer 一直啟動著,它就會無限期地向位移主題寫入訊息,就算沒有新訊息進來 也會通過定時任務重複寫相同位移 最終撐爆磁碟?

Kafka 提供了專門的後臺執行緒定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除資料,這個後臺執行緒叫 Log Cleaner,對相同的key只保留最新的一條訊息。

26、Consumer Group Rebalance:

術語簡介:

Rebalance :就是讓一個 Consumer Group 下所有的 Consumer 例項就如何消費訂閱主題的所有分割槽達成共識的過程。

Coordinator:它專門為 Consumer Group 服務,負責為 Group 執行 Rebalance 以及提供位移管理和組成員管理等。

Consumer 端應用程式在提交位移時,其實是向 Coordinator 所在的 Broker 提交位移。同樣地,當 Consumer 應用啟動時,也是向 Coordinator 所在的 Broker 傳送各種請求,然後由 Coordinator 負責執行消費者組的註冊、成員管理記錄等元資料管理操作。

如何確定Coordinator位置 :partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount) 比如(abs(627841412 % 50)=12 Coordinator就在 partitionId=12的Leader 副本所在的 Broker)。

Rebalance的危害:

Rebalance 影響 Consumer 端 TPS 這期間不會工作

Rebalance 很慢 Consumer越多 Rebalance時間越長

Rebalance 效率不高 需要所有成員參與

觸發 Rebalance場景:

組成員數量發生變化

訂閱主題數量發生變化

訂閱主題的分割槽數發生變化

如何避免 Rebalance:

設定 session.timeout.ms = 15s (session連線時間 預設10)

設定 heartbeat.interval.ms = 2s(心跳時間)

max.poll.interval.ms (取決你一批訊息處理時長 預設5分鐘)

要保證 Consumer 例項在被判定為“dead”之前,能夠傳送至少 3 輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

27、Kafka 攔截器:

Kafka 攔截器分為生產者攔截器和消費者攔截器,可以應用於包括客戶端監控、端到端系統效能檢測、訊息審計等多種功能在內的場景。

例:生產者Interceptor





 

部分圖文資料地址

-- 極客時間 Kafka 核心技術與實戰

--google