Kafka生產問題總結及效能優化實踐

語言: CN / TW / HK

theme: geek-black

Kafka生產問題總結及效能優化實踐

Kafka線上JVM調優

詳細連結: https://www.processon.com/view/link/628753b51efad466f3421903

Kafka億級流量調優.png

  • JVM引數設定

  • Kafka是scala語言開發,執行在JVM上,需要對JVM引數合理設定

    • 修改bin/kafka-start-server.sh中的jvm設定,假設機器是32G記憶體,可以如下設定

    shell export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"

  • 這種大記憶體的情況一般都是要用G1垃圾收集器,因為年輕代記憶體比較大,用G1可以設定GC最大停頓時間,不至於一次Minor GC就花費太長時間。當然像Kafka、RocketMQ、ES這些中介軟體,寫資料到磁碟會用到作業系統的page cache,所以JVM記憶體不適合分配過大,需要給作業系統預留幾個G快取

線上問題及優化

訊息丟失

  • 訊息傳送端
  • acks=0: 表示producer不需要等待任何broker確認收到訊息的回覆,就可以繼續傳送下一條訊息。效能最高,但是最容易丟訊息。大資料統計報表場景,對效能要求很高,對資料丟失不敏感的情況可以用這種
  • acks=1: 至少要等待leader已經成功將資料寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續傳送下一條訊息。這種情況下,如果follower沒有成功備份資料,而此時leader又掛掉,則訊息會丟失
  • acks=-1或all: 這意味著leader需要等待所有備份(min.insync.replicas配置的備份個數)都成功寫入日誌,這種策略會保證只要有一個備份存活就不會丟失資料。這是最強的資料保證。一般除非是金融級別,或跟錢打交道的場景才會使用這種配置。當然如果min.insync.replicas配置的是1則也可能丟訊息,跟acks=1情況類似
  • 訊息消費端
  • 如果消費這邊配置的是自動提交,萬一消費到資料還沒處理完,就自動提交offset了,但是此時你consumer直接宕機了,未處理完的資料丟失了,下次也消費不到了

訊息重複消費

  • 訊息傳送端
  • 傳送訊息如果配置了重試機制,比如網路抖動時間過長導致傳送端傳送超時,實際broker可能已經接收到訊息,但傳送方會重新發送訊息
  • 訊息消費端
  • 如果消費這邊配置的是自動提交,剛拉取了一批資料處理了一部分,但還沒來得及提交,服務掛了,下次重啟又會拉取相同的一批資料重複處理,一般消費端都是要做消費冪等處理的

冪等性: 一個數據或者一個請求,重複來多次,確保對應的資料是不會改變的,不能出錯

常見介面冪等性解決方案

  • 如果是寫Redis就沒問題,反正每次都是set,天然冪等性
  • 全域性唯一ID: 生產者傳送訊息的時候帶上一個全域性唯一id,消費者拿到訊息後,先根據這個id去Redis裡查一下之前有沒有消費過,沒有消費過就處理,並且寫入這個id到Redis,知道消費過了,則不處理
  • 基於資料庫的唯一鍵
  • 去重表: 將處理過的東西放在一個表中,在下次再來的時候,先判斷有沒有處理過,如果已經處理過了,就不用再處理
  • 樂觀鎖: 每次更新資料都要跟之前的版本進行比對
  • 悲觀鎖
  • 狀態機: 很多業務中多有多個狀態,比如訂單的狀態有提交、待支付、已支付、取消、退款等等狀態。後端可以根據不同的狀態去保證冪等性,比如在退款的時候,一定要保證這筆訂單是已支付的狀態

消費亂序

  • 如果傳送端配置了重試機制,kafka不會等之前那條訊息完全傳送成功才去傳送下一條訊息,這樣可能會出現,傳送了1,2,3條訊息,第一條超時了,後面兩條傳送成功,再重試傳送第1條訊息,這時訊息在broker端的順序就是2,3,1了,所以,是否一定要配置重試要根據業務情況而定。也可以用同步傳送的模式去發訊息,當然acks不能設定為0,這樣也能保證訊息傳送的有序。kafka保證全鏈路訊息順序消費,需要從傳送端開始,將所有有序訊息傳送到同一個分割槽,然後用一個消費者去消費,但是這種效能比較低,可以在消費者端接收到訊息後將需要保證順序消費的幾條消費發到記憶體佇列(可以搞多個),一個記憶體佇列開啟一個執行緒順序處理訊息

  • 問題: 支付、消費、下單三步驟產生消費亂序,如何解決

  • 方式一: 可以將訊息全部用countdownlatch收集過來,然後根據業務進行內部排序處理之後,再進行處理
  • 方式二:根據不同型別,分發到各自對應型別的記憶體佇列,然後集中到一個佇列去處理

訊息積壓

  • 線上有時因為傳送方傳送訊息速度過快,或者消費方處理訊息過慢,可能會導致broker積壓大量未消費訊息。
  • 此種情況如果積壓了上百萬未消費訊息需要緊急處理,可以修改消費端程式,讓其將收到的訊息快速轉發到其他topic(可以設定很多分割槽),然後再啟動多個消費者同時消費新主題的不同分割槽
  • 由於訊息資料格式變動或消費者程式有bug,導致消費者一直消費不成功,也可能導致broker積壓大量未消費訊息。
  • 此種情況可以將這些消費不成功的訊息轉發到其它佇列裡去(類似死信佇列),後面再慢慢分析死信佇列裡的訊息處理問題
  • 注意:Kafka是沒有自己實現死信佇列的,需要自己新建一個Topic作為死信佇列

延時佇列

  • 延時佇列儲存的物件是延時訊息,所謂的“延時訊息”是指訊息被髮送以後,並不想讓消費者立刻獲取,而是等待特定的時間後,消費者才能獲取這個訊息進行消費,延時佇列的使用場景有很多,比如
  • 在訂單系統中, 一個使用者下單之後通常有 30 分鐘的時間進行支付,如果 30 分鐘之內沒有支付成功,那麼這個訂單將進行異常處理,這時就可以使用延時佇列來處理這些訂單了
  • 訂單完成1小時後通知使用者進行評價
  • 實現思路
  • 傳送延時訊息時先把訊息按照不同的延遲時間段傳送到指定的佇列中(topic_1s,topic_5s,topic_10s,...topic_2h,這個一般不能支援任意時間段的延時),然後通過定時器進行輪訓消費這些topic,檢視訊息是否到期,如果到期就把這個訊息傳送到具體業務處理的topic中,佇列中訊息越靠前的到期時間越早,具體來說就是定時器在一次消費過程中,對訊息的傳送時間做判斷,看下是否延遲到對應時間了,如果到了就轉發,如果還沒到這一次定時任務就可以提前結束了

訊息回溯

如果某段時間對已消費訊息計算的結果覺得有問題,可能是由於程式bug導致的計算錯誤,當程式bug修復後,這時可能需要對之前已消費的訊息重新消費,可以指定從多久之前的訊息回溯消費,這種可以用consumer的offsetsForTimes、seek等方法指定從某個offset偏移的訊息開始消費

分割槽數越多吞吐量越高嗎

  • RabbitMQ和Kafka、RocketMQ最大的區別:分散式儲存

  • 可以用kafka壓測工具自己測試分割槽數不同,各種情況下的吞吐量

shell # 往test裡傳送一百萬訊息,每條設定1KB # throughput 用來進行限流控制,當設定的值小於 0 時不限流,當設定的值大於 0 時,當傳送的吞吐量大於該值時就會被阻塞一段時間 bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.65.60:9092 acks=1

  • 網路上很多資料都說分割槽數越多吞吐量越高 , 但從壓測結果來看,分割槽數到達某個值吞吐量反而開始下降,實際上很多事情都會有一個臨界值,當超過這個臨界值之後,很多原本符合既定邏輯的走向又會變得不同。一般情況分割槽數跟叢集機器數量相當就差不多了

  • 當然吞吐量的數值和走勢還會和磁碟、檔案系統、 I/O排程策略等因素相關

  • 注意
  • 如果分割槽數設定過大,比如設定10000,可能會設定不成功,後臺會報錯"java.io.IOException : Too many open files"
  • 異常中最關鍵的資訊是“ Too many open flies”,這是一種常見的 Linux 系統錯誤,通常意味著檔案描述符不足,它一般發生在建立執行緒、建立 Socket、開啟檔案這些場景下 。 在 Linux系統的預設設定下,這個檔案描述符的個數不是很多 ,通過 ulimit -n 命令可以檢視:一般預設是1024,可以將該值增大,比如:ulimit -n 65535

訊息傳遞保障

  • at most once(消費者最多收到一次訊息,0--1次):acks = 0 可以實現。
  • at least once(消費者至少收到一次訊息,1--多次):ack = all 可以實現。
  • exactly once(消費者剛好收到一次訊息):at least once 加上消費者冪等性可以實現,還可以用kafka生產者的冪等性來實現

注意

  • kafka生產者的冪等性:因為傳送端重試導致的訊息重複傳送問題,kafka的冪等性可以保證重複傳送的訊息只接收一次,只需在生產者加上引數 props.put(“enable.idempotence”, true) 即可,預設是false不開啟
  • 具體實現原理是: kafka每次傳送訊息會生成PID和Sequence Number,並將這兩個屬性一起傳送給broker,broker會將PID和Sequence Number跟訊息繫結一起存起來,下次如果生產者重發相同訊息,broker會檢查PID和Sequence Number,如果相同不會再接收
  • PID: 每個新的 Producer 在初始化的時候會被分配一個唯一的 PID,這個PID 對使用者完全是透明的,生產者如果重啟則會生成新的PID
  • Sequence Number:對於每個 PID,該 Producer 傳送到每個 Partition 的資料都有對應的序列號,這些序列號是從0開始單調遞增的

kafka的事務

  • Kafka的事務不同於Rocketmq,Rocketmq是保障本地事務(比如資料庫)與mq訊息傳送的事務一致性Kafka的事務主要是保障一次傳送多條訊息的事務一致性(要麼同時成功要麼同時失敗),一般在kafka的流式計算場景用得多一點,比如,kafka需要對一個topic裡的訊息做不同的流式計算處理,處理完分別發到不同的topic裡,這些topic分別被不同的下游系統消費(比如hbase,redis,es等),這種我們肯定希望系統傳送到多個topic的資料保持事務一致性。

  • Kafka要實現類似Rocketmq的分散式事務需要額外開發功能

  • kafka的事務處理可以參考官方文件

kafka高效能的原因

  • 磁碟順序讀寫:kafka訊息不能修改以及不會從檔案中間刪除保證了磁碟順序讀,kafka的訊息寫入檔案都是追加在檔案末尾,不會寫入檔案中的某個位置(隨機寫)保證了磁碟順序寫。
  • 讀寫資料的批量batch處理以及壓縮傳輸
  • 資料傳輸的零拷貝

資料傳輸零拷貝原理.png