分散式流處理元件-優化篇:Producer生產調優之核心引數

語言: CN / TW / HK

theme: nico highlight: tomorrow-night


💯 作者:謝先生。 2014年入行的程式猿。多年開發和架構經驗。專注於Java、雲原生、大資料等技術。從CRUD入行,負責過億級流量架構的設計和落地,解決了千萬級資料治理問題。

📖 微信公眾號、B站:搜尋「謝先生說技術」不定時更新 ~

📂 清單: goku-framework【定期開源】享閱讀II

前言

發句牢騷: - 最近真的累。年後第三天就去出差。3月回來馬上又投入到新專案中~ ┓(;´_`)┏ - 最近老危險了。專案都是別的組的~~

前面我們花了較長的時間對生產者Producer理論、Producer分割槽做了一個比較細緻的介紹。詳細大家在認真閱讀完前兩節的內容之後會對Kafka的生產者有一個比較清晰的認知。

其中我們需要重點掌握的內容是:Producer傳送訊息的過程,如果有不清楚的建議返回好好品味。 - 《分散式流處理元件-理論篇:Producer入門理論》

我們需要注意的是: - 在生產環境由於物理機器等資源配置的影響,也為了更大程度上保證資源的利用率,我們都會對各個元件進行適配。

而Kafka的Producer也是一樣的。在生產環境中也有很多需要注意的點。本章我們就來好好的聊一聊~

囉嗦兩句Producer的訊息傳送原理

為了讓大家清晰的理解優化點,我們簡單過一下Producer的傳送原理 已經明白的請略過~

如上圖所示,訊息資料通過主執行緒呼叫producer.send()將其傳送出去,其中經過攔截器、序列化器、分割槽器的層層加工之後,記錄緩衝區RecordAccumulator會將加工之後的訊息記錄新增到其中。而訊息也不是單純的存在於RecordAccumulator中,為了降低網路IO,Producer將其按照batch的形式進行存放。

RecordAccumulator當然不是無限大的,自然這裡就是我們的第一個優化點

而合理的對batch容量進行配置,就是我們所說的第二個優化點

訊息的大小也是我們需要考慮的重點,姑且算是一個優化點

而batch訊息也會根據分割槽器計算得到的分割槽號存在於對應的Deque雙端佇列中,所以他們的關係就是圖中一層包一層的樣子

當batch滿足某個條件或者訊息等待指定時間之後,sender執行緒被拉起,Sender程式將不斷從緩衝區取出資料,進而進入到另外一個階段

新的優化點:指定等待時間

從緩衝區拉取出來的資料會被封裝為Request物件,並且與快取區類似的是: - NetworkClient中同樣會存在一個類似緩衝區的存在:InFlightRequests。其中會按照分割槽對Request進行儲存。所以他們的邏輯關係其實是這樣的

2023-03-18-22-05-53

隨後進行傳送,而在預設情況下,如果Broker端一直沒有響應,每個分割槽下的Request只能存放5個請求。而超出的情況將會阻塞傳送邏輯。

訊息傳送成功後,將會清空原始資料。否則嘗試重試等操作

Producer高吞吐

注意: 本段屬於測試階段,多圖模式

前置條件

來吧,囉嗦完Producer傳送過程之後,就到了精彩的測試驗證環節。有句話需要重點說明下: - 以下驗證結果不屬銀彈,實際生產中引數配置如何: 還是需要結合實際業務場景和資源配置給出最佳引數

機器配置: - 3臺 2C4G 的虛擬機器 - Topic設定分割槽數為3,副本因子為2

測試指令碼

本次Producer吞吐量測試指令碼在Kafka中已經提供,我們在${KAFKA_HOME}/kafka_2.13-3.3.1/bin中可以找到kafka-producer-perf-test.sh。如下對執行引數進行簡單說明:

  • --throughput: 限制測試傳送訊息的最大吞吐量,-1表示不受限制
  • --num-records: 測試訊息的資料量。
  • --record-size: 每條訊息的大小。
  • --producer-props: Producer可配置引數資訊,我們也可以通過properties檔案的方式配置到--producer.config

預設引數負載情況

kafka-producer-perf-test.sh內部也是預設引數,如果機器配置不錯可以適當調整JVM引數

bash kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 --throughput -1 --num-records 100000 --record-size 1024 傳送10W條訊息大小為1KB的訊息到newTopic_test001上,不設定吞吐限制

text 30421 records sent, 6083.0 records/sec (5.94 MB/sec), 2125.8 ms avg latency, 3695.0 ms max latency. 55005 records sent, 11001.0 records/sec (10.74 MB/sec), 3129.7 ms avg latency, 4864.0 ms max latency. 100000 records sent, 9042.408898 records/sec (8.83 MB/sec), 2719.28 ms avg latency, 4864.00 ms max latency, 2598 ms 50th, 4250 ms 95th, 4734 ms 99th, 4832 ms 99.9th. 輸出表示: - 成功消費100000記錄,吞吐量:10101.010101條/s (9.86 MB/sec) - 平均延遲2451.60ms,最大延遲4025.00ms - 50%訊息延遲2455ms, 95%訊息延遲3581ms, 99%訊息延遲3875ms, 99.9%訊息延遲4016ms。

引數介紹

下面要介紹的這幾個引數,在Producer端:任意一個都可以稱為王牌的存在。接下來我們就一一來看看:

這些引數在ProducerConfig中都有介紹,我們一一來看~

batch.size

每當傳送多個訊息時,為了提高客戶端和伺服器的效能,生產者將嘗試對多個訊息進行打包成批,保證這一批可以在同一個分割槽內。預設為16384【16KB】

為了儘可能的提高吞吐量,在實際生產中需要對傳送的訊息進行合理預估,根據實際情況選擇一個合理的大小,避免出現如下情況: - 單條訊息超過batch.size,Producer有可能不會處理此訊息 - batch.size過大,有可能會造成Producer端記憶體空間的浪費 - batch.size過小,頻繁的網路IO會降低Producer的吞吐

linger.ms

如果訊息遲遲沒有達到batch.size,那麼將嘗試等待linger.ms時間傳送。預設等待時間為0,也就是當訊息到達之後立即傳送

但實際上為了減少傳送的請求數量,在沒有負載的情況下也會延遲5ms的時間。所以這也不是絕對的立即傳送~

這兩個引數我們或多或少都介紹過,但是並沒有測試調整它們對吞吐量的影響。接下來我們就先來測試一波吧!!!

測試batch.sizelinger.ms對吞吐量的影響

對照以上預設測試結果,然後我們開始進行引數調整。

跟著我的節奏,先對單個引數進行調整,對比差異

  • batch.size=32768

bash kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=32768 --throughput -1 --num-records 100000 --record-size 1024

三次輸出結果顯示

```text 68697 records sent, 13733.9 records/sec (13.41 MB/sec), 900.0 ms avg latency, 1812.0 ms max latency. 100000 records sent, 14817.009927 records/sec (14.47 MB/sec), 1129.92 ms avg latency, 1957.00 ms max latency, 1167 ms 50th, 1884 ms 95th, 1932 ms 99th, 1951 ms 99.9th.

100000 records sent, 22311.468095 records/sec (21.79 MB/sec), 1036.42 ms avg latency, 1467.00 ms max latency, 1100 ms 50th, 1431 ms 95th, 1449 ms 99th, 1462 ms 99.9th.

100000 records sent, 21753.317381 records/sec (21.24 MB/sec), 1001.94 ms avg latency, 1646.00 ms max latency, 955 ms 50th, 1614 ms 95th, 1631 ms 99th, 1639 ms 99.9th. ```

對比預設結果,我們可以看到在吞吐量上已經有了非常明顯的提高

  • linger.ms=3000

bash kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 --throughput -1 --num-records 100000 --record-size 1024

三次輸出結果顯示

```text 60526 records sent, 12102.8 records/sec (11.82 MB/sec), 1632.5 ms avg latency, 2337.0 ms max latency. 100000 records sent, 12280.486307 records/sec (11.99 MB/sec), 1882.64 ms avg latency, 2591.00 ms max latency, 2050 ms 50th, 2553 ms 95th, 2571 ms 99th, 2584 ms 99.9th.

64966 records sent, 12990.6 records/sec (12.69 MB/sec), 1556.1 ms avg latency, 2757.0 ms max latency. 100000 records sent, 13540.961408 records/sec (13.22 MB/sec), 1719.67 ms avg latency, 2757.00 ms max latency, 1474 ms 50th, 2701 ms 95th, 2730 ms 99th, 2744 ms 99.9th.

71776 records sent, 14355.2 records/sec (14.02 MB/sec), 1506.2 ms avg latency, 2176.0 ms max latency. 100000 records sent, 14320.492625 records/sec (13.98 MB/sec), 1577.49 ms avg latency, 2176.00 ms max latency, 1668 ms 50th, 2134 ms 95th, 2154 ms 99th, 2170 ms 99.9th. ```

同樣還是要和預設測試結果進行對比,雖然吞吐量沒有配置batch.size的效果差異,但也不能說本次調整不重要

  • 合併引數測試

bash kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 batch.size=32768 --throughput -1 --num-records 100000 --record-size 1024

三次輸出結果顯示

text 100000 records sent, 21739.130435 records/sec (21.23 MB/sec), 1007.46 ms avg latency, 1428.00 ms max latency, 995 ms 50th, 1400 ms 95th, 1411 ms 99th, 1424 ms 99.9th. 100000 records sent, 23041.474654 records/sec (22.50 MB/sec), 952.25 ms avg latency, 1334.00 ms max latency, 919 ms 50th, 1302 ms 95th, 1318 ms 99th, 1328 ms 99.9th. 100000 records sent, 24271.844660 records/sec (23.70 MB/sec), 916.17 ms avg latency, 1318.00 ms max latency, 912 ms 50th, 1278 ms 95th, 1309 ms 99th, 1314 ms 99.9th.

已經可以看出對比了吧。 我們繼續~~~

compression.type

該引數對Producer生產的資料進行壓縮,主要針對批資料壓縮。預設是none【無壓縮】,可以用來設定的值: - gzip - snappy - lz4 - zstd

接下來我們直接測試這幾個壓縮演算法的效能吧,為了方便寫個指令碼

```bash

!/bin/bash

for i in gzip snappy lz4 zstd do for ((j=0; j<3; j++)) do echo "----$i方式的第$j次測試----"

    kafka-producer-perf-test.sh --topic newTopic_test002 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=65536 compression.type=$i --throughput -1 --num-records 100000  --record-size 1024

done

done ```

檢視每個級別的三次輸出

text ----gzip方式的第0次測試---- 100000 records sent, 25425.883549 records/sec (24.83 MB/sec), 136.45 ms avg latency, 744.00 ms max latency, 97 ms 50th, 451 ms 95th, 677 ms 99th, 742 ms 99.9th. ----gzip方式的第1次測試---- 100000 records sent, 27019.724399 records/sec (26.39 MB/sec), 54.22 ms avg latency, 291.00 ms max latency, 27 ms 50th, 174 ms 95th, 228 ms 99th, 288 ms 99.9th. ----gzip方式的第2次測試---- 100000 records sent, 27940.765577 records/sec (27.29 MB/sec), 21.55 ms avg latency, 267.00 ms max latency, 11 ms 50th, 75 ms 95th, 97 ms 99th, 130 ms 99.9th. ===========分割線============= ----snappy方式的第0次測試---- 100000 records sent, 34153.005464 records/sec (33.35 MB/sec), 518.28 ms avg latency, 1047.00 ms max latency, 479 ms 50th, 969 ms 95th, 1029 ms 99th, 1044 ms 99.9th. ----snappy方式的第1次測試---- 100000 records sent, 32765.399738 records/sec (32.00 MB/sec), 474.20 ms avg latency, 985.00 ms max latency, 476 ms 50th, 921 ms 95th, 971 ms 99th, 984 ms 99.9th. ----snappy方式的第2次測試---- 100000 records sent, 34578.146611 records/sec (33.77 MB/sec), 450.48 ms avg latency, 932.00 ms max latency, 442 ms 50th, 869 ms 95th, 911 ms 99th, 928 ms 99.9th. ===========分割線============= ----lz4方式的第0次測試---- 100000 records sent, 34059.945504 records/sec (33.26 MB/sec), 474.75 ms avg latency, 871.00 ms max latency, 461 ms 50th, 818 ms 95th, 852 ms 99th, 867 ms 99.9th. ----lz4方式的第1次測試---- 100000 records sent, 37174.721190 records/sec (36.30 MB/sec), 408.28 ms avg latency, 769.00 ms max latency, 418 ms 50th, 716 ms 95th, 753 ms 99th, 767 ms 99.9th. ----lz4方式的第2次測試---- 100000 records sent, 32404.406999 records/sec (31.64 MB/sec), 483.38 ms avg latency, 1145.00 ms max latency, 372 ms 50th, 1077 ms 95th, 1122 ms 99th, 1142 ms 99.9th. ===========分割線============= ----zstd方式的第0次測試---- 100000 records sent, 51975.051975 records/sec (50.76 MB/sec), 57.53 ms avg latency, 279.00 ms max latency, 54 ms 50th, 108 ms 95th, 141 ms 99th, 158 ms 99.9th. ----zstd方式的第1次測試---- 100000 records sent, 47755.491882 records/sec (46.64 MB/sec), 66.12 ms avg latency, 333.00 ms max latency, 49 ms 50th, 208 ms 95th, 316 ms 99th, 332 ms 99.9th. ----zstd方式的第2次測試---- 100000 records sent, 51203.277010 records/sec (50.00 MB/sec), 60.27 ms avg latency, 265.00 ms max latency, 49 ms 50th, 148 ms 95th, 172 ms 99th, 193 ms 99.9th. ===========分割線=============

根據本次測試,而且測試命令也運行了多次,確實是zstd較好。 而我們再看看官網給出的說明: 2023-03-18-22-04-33

留個坑,下來我重新試試~~~

acks

前面章節中其實我們介紹過acks這個引數。

我們再囉嗦一下acks的可設定值

  • "0": 當訊息呼叫send()傳送出去之後就表示訊息已經發送成功,不管訊息是否已經到達broker
  • "1": 訊息傳送後,Leader接收到訊息並記錄到本地之後,不需要同步資料到副本就能進行ack返回
  • "all": 當訊息在Leader接收記錄,並且等待副本資料同步完成之後,才會返回ack。 該級別也屬於Java#Producer的預設配置

接下來我們針對以上三種級別來做一個效果驗證

基本就是預設配置,只不過多了一個acks的配置項 ```bash

!/bin/bash

for i in 0 1 all do echo "---------------acks=$i------------------" for ((j=0; j<3; j++)) do kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 acks=$i --throughput -1 --num-records 100000 --record-size 1024 done done ```

檢視每個級別的三次輸出

有資料有真相 ```text --------------acks=0------------------ 100000 records sent, 43878.894252 records/sec (42.85 MB/sec), 60.90 ms avg latency, 367.00 ms max latency, 20 ms 50th, 218 ms 95th, 297 ms 99th, 363 ms 99.9th. 100000 records sent, 38699.690402 records/sec (37.79 MB/sec), 121.56 ms avg latency, 823.00 ms max latency, 60 ms 50th, 469 ms 95th, 736 ms 99th, 814 ms 99.9th. 100000 records sent, 37650.602410 records/sec (36.77 MB/sec), 127.71 ms avg latency, 441.00 ms max latency, 100 ms 50th, 351 ms 95th, 403 ms 99th, 440 ms 99.9th. ---------------acks=1------------------ 100000 records sent, 30284.675954 records/sec (29.57 MB/sec), 387.87 ms avg latency, 897.00 ms max latency, 340 ms 50th, 799 ms 95th, 855 ms 99th, 893 ms 99.9th. 100000 records sent, 36995.930448 records/sec (36.13 MB/sec), 247.35 ms avg latency, 826.00 ms max latency, 199 ms 50th, 703 ms 95th, 789 ms 99th, 813 ms 99.9th. 100000 records sent, 37425.149701 records/sec (36.55 MB/sec), 394.01 ms avg latency, 850.00 ms max latency, 389 ms 50th, 784 ms 95th, 814 ms 99th, 841 ms 99.9th. ---------------acks=all------------------ 76362 records sent, 15272.4 records/sec (14.91 MB/sec), 1422.4 ms avg latency, 1980.0 ms max latency. 100000 records sent, 16084.928422 records/sec (15.71 MB/sec), 1463.71 ms avg latency, 1980.00 ms max latency, 1628 ms 50th, 1903 ms 95th, 1960 ms 99th, 1974 ms 99.9th.

81151 records sent, 16223.7 records/sec (15.84 MB/sec), 1404.0 ms avg latency, 2201.0 ms max latency. 100000 records sent, 16906.170752 records/sec (16.51 MB/sec), 1397.82 ms avg latency, 2201.00 ms max latency, 1491 ms 50th, 2001 ms 95th, 2136 ms 99th, 2193 ms 99.9th.

88156 records sent, 17631.2 records/sec (17.22 MB/sec), 1245.0 ms avg latency, 1688.0 ms max latency. 100000 records sent, 17969.451932 records/sec (17.55 MB/sec), 1255.73 ms avg latency, 1688.00 ms max latency, 1403 ms 50th, 1598 ms 95th, 1635 ms 99th, 1668 ms 99.9th. ```

所以: - 如果是類似日誌、行為等不重要的訊息,建議將acks設定為0. - 其他的就根據訊息的安全程度來進行合理的選擇吧~

下期預告

本期針對Producer調優引數的介紹和測試對比到這裡就已經結束了。還是一句話: - 從資料看結果:實際生產中進行測試,選擇合理的配置引數資訊是必須得~

下一期針對資料可靠性我們來做一個詳細的介紹。 期待~