kafka的訊息持久化檔案

語言: CN / TW / HK

最近排查kafka的問題,涉及到了kafka的訊息儲存,本文就相關內容進行總結。

我們都知道,topic是有分割槽(partition)的概念的, 生產者往同一個topic傳送的訊息最終是傳送到了不同的分割槽裡面。也就是說,一個topic裡的訊息是由該topic下所有分割槽裡的訊息組成的在同一個分割槽裡,訊息是有序的,而不同分割槽中,訊息是不能保證有序的。

有了這個簡單認識後,自然可以知道,每個分割槽僅會儲存該分割槽下的訊息。在配置("log.dirs")指定的目錄下,有很多以"$topic-$partition"為名稱的目錄,在每個這樣的目錄下,就存放對應"topic",對應"partition"訊息的持久化檔案。

注:由於每個分割槽都有leader的概念,而不同分割槽的leader可能位於不同的broker上,除了leader之外,分割槽還有副本(replica)的概念,因此每個broker只會儲存分割槽leader或副本位於該broker中的topic的訊息

在《kafka客戶端訊息傳送邏輯》一文中提到了,生產者傳送訊息時,其實是一批(batch)一批來發送的,一批訊息中可能包含一條或多條訊息。同時,上面也提到了,同一個topic的同一個分割槽裡的訊息是有序的。有序的訊息通常會有一個偏移量的概念。kafka內部對訊息持久化儲存時,也遵循類似的理念,按批次儲存,同時記錄訊息的偏移位置,以及訊息的時間戳等資訊。

在具體實現中,一個分割槽內的訊息,劃分為多個segment,segment是一個邏輯概念,一個segment對應一個訊息段,一個訊息段中又包含一批或多批訊息(如下圖中的RecordBatch),一批訊息就是客戶端按batch組裝傳送過來的訊息集,包含一條或多條訊息(如下圖中的Record)。

一個segment由三個檔案組成,分別為訊息檔案(*.log)儲存具體的訊息內容、訊息索引檔案(*.index)儲存訊息在分割槽中的索引、訊息時間戳索引檔案(*.timeindex)則儲存了訊息對應的時間戳。這三個檔案均以檔案中儲存的首個訊息在分割槽中的偏移量作為檔名的字首。

接下來就分別講述下這幾個檔案的具體格式。

1) *.log

log檔案中的內容就是一個segment中實際包含的訊息然後按批次進行儲存。每一批訊息都包含固定位元組長度的頭部資訊,以及一到多條訊息。在頭部資訊中儲存了基準偏移(BaseOffset),即該批次中的第一條訊息在整個分割槽中的偏移位置;長度(Length);分割槽leader的epoch(LeaderEpoch);用於指定訊息儲存格式的魔數(Magic);校驗和(CRC);訊息批屬性(Attributes);該批次最後一條訊息與第一條訊息的相對偏移(LastOffsetDelta);第一條訊息的時間戳(FirstTimestamp);該批次的訊息數(RecordCount)等內容。

而每條訊息則記錄了訊息的整體長度、屬性、訊息的key、實際內容、頭資訊等。

需要注意的是:在訊息的儲存格式中,除屬性欄位固定1位元組外,其他資訊均採用zigzag的編碼方式這樣可以有效壓縮儲存空間。

以一個實際檔案內容,對照上面的方式解析來檢視下,檔案的開頭如下所示:

對照上面的格式可以得到:

0000 0000 0000 0000  BaseOffset:  0
0000 3cb8            Length: 15544
0000 0000            LeaderEpoch: 0
02                   Magic: 2
be2a b271            CRC: 3190469233    
0000                 Attributes:  
0000 000e            LastOffsetDelta: 14
0000 0185 7572 a380  FirstTimestamp: 1672712725376
0000 0185 7572 a387  MaxTimestamp: 1672712725383
ffff ffff ffff ffff  ProducerId: -1
ffff                 ProducerEpoch: -1
ffff ffff            BaseSequence: -1
0000 000f            recordCount: 15

中間為15條訊息的內容,這裡省略不分析,再之後,可以看到第二批訊息的頭資訊:

同樣,按照前面的格式可以分析可以得到:

0000 0000 0000 000f  BaseOffset:  15
0000 3cb8            Length: 15544
0000 0000            LeaderEpoch: 0
02                   Magic: 2
e3ff 46f2            CRC: 3825157874    
0000                 Attributes:  
0000 000e            LastOffsetDelta: 14
0000 0185 7572 a387  FirstTimestamp: 1672712725383
0000 0185 7572 a388  MaxTimestamp: 1672712725384
ffff ffff ffff ffff  ProducerId: -1
ffff                 ProducerEpoch: -1
ffff ffff            BaseSequence: -1
0000 000f            recordCount:     15

後面的內容可以依次類推。另外, 通過自帶命令也能和實際解析的內容對得上:

[root@kafka-0 bin]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/data/kafka/hncscwc-0/00000000000000000000.log
Dumping /opt/data/kafka/hncscwc-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1672712725383 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 3190469233
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 15556 CreateTime: 1672712725384 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 3825157874
baseOffset: 30 lastOffset: 44 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 31112 CreateTime: 1672712725385 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 2410094720
baseOffset: 45 lastOffset: 59 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 46668 CreateTime: 1672712725385 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 3333194410
baseOffset: 60 lastOffset: 74 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 62224 CreateTime: 1672712725386 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 2980189431
baseOffset: 75 lastOffset: 89 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 77780 CreateTime: 1672712725388 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 3334415833
。。。

注:對於採用事務方式寫入的資料,這裡暫不舉例說明。

2)*.index

該檔案記錄了訊息在log檔案中的起始偏移位置。其檔案格式比較簡單,由多個條目組成, 每個條目固定4位元組的訊息偏移量加固定4位元組的檔案偏移量。

實際檔案內容示例如下圖所示:

同樣,通過自帶命令也能和上述分析內容對上:

[root@kafka-0 bin]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/data/kafka/hncscwc-0/00000000000000000000.index
Dumping /opt/data/kafka/hncscwc-0/00000000000000000000.index
offset: 29 position: 15556
offset: 44 position: 31112
offset: 59 position: 46668
offset: 74 position: 62224
offset: 89 position: 77780
offset: 104 position: 93336
offset: 119 position: 108892
offset: 134 position: 124448
offset: 149 position: 140004
offset: 164 position: 155560
offset: 179 position: 171116
offset: 194 position: 186672
offset: 209 position: 202228
offset: 224 position: 217784
offset: 239 position: 233340
offset: 254 position: 248896

3)*.timeindex

timestamp是從0.10版本開始引入的功能,每條訊息都有一個對應的時間戳。生產者可以配置設定時間戳的型別,預設為建立時間(另外一個可選值是日誌追加時間,即寫入的時間)

該檔案記錄了不同時間戳對應的訊息的偏移。檔案格式和index一樣,由多個條目組成,每個條目為固定8位元組的時間戳加固定4位元組的偏移量構成。這裡就不再實際舉例說明了。


小結一下,本文主要分析了kafka訊息的持久化檔案,以及具體的檔案格式。由興趣的朋友也可以對照分析下,對於kafka具體將訊息寫入的時機是怎樣的,如何決定應該將訊息寫入新的segment。訊息的讀取邏輯又是怎樣的,後續再結合原始碼進行剖析。

好了,這就是本文的全部內容,如果覺得本文對您有幫助,請點贊+轉發,也歡迎加我微信交流~

本文分享自微信公眾號 - 陳猿解碼(gh_383bc7486c1a)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。