分布式流处理组件-优化篇:Producer生产调优之核心参数

语言: CN / TW / HK

theme: nico highlight: tomorrow-night


💯 作者:谢先生。 2014年入行的程序猿。多年开发和架构经验。专注于Java、云原生、大数据等技术。从CRUD入行,负责过亿级流量架构的设计和落地,解决了千万级数据治理问题。

📖 微信公众号、B站:搜索「谢先生说技术」不定时更新 ~

📂 清单: goku-framework【定期开源】享阅读II

前言

发句牢骚: - 最近真的累。年后第三天就去出差。3月回来马上又投入到新项目中~ ┓(;´_`)┏ - 最近老危险了。项目都是别的组的~~

前面我们花了较长的时间对生产者Producer理论、Producer分区做了一个比较细致的介绍。详细大家在认真阅读完前两节的内容之后会对Kafka的生产者有一个比较清晰的认知。

其中我们需要重点掌握的内容是:Producer发送消息的过程,如果有不清楚的建议返回好好品味。 - 《分布式流处理组件-理论篇:Producer入门理论》

我们需要注意的是: - 在生产环境由于物理机器等资源配置的影响,也为了更大程度上保证资源的利用率,我们都会对各个组件进行适配。

而Kafka的Producer也是一样的。在生产环境中也有很多需要注意的点。本章我们就来好好的聊一聊~

啰嗦两句Producer的消息发送原理

为了让大家清晰的理解优化点,我们简单过一下Producer的发送原理 已经明白的请略过~

如上图所示,消息数据通过主线程调用producer.send()将其发送出去,其中经过拦截器、序列化器、分区器的层层加工之后,记录缓冲区RecordAccumulator会将加工之后的消息记录添加到其中。而消息也不是单纯的存在于RecordAccumulator中,为了降低网络IO,Producer将其按照batch的形式进行存放。

RecordAccumulator当然不是无限大的,自然这里就是我们的第一个优化点

而合理的对batch容量进行配置,就是我们所说的第二个优化点

消息的大小也是我们需要考虑的重点,姑且算是一个优化点

而batch消息也会根据分区器计算得到的分区号存在于对应的Deque双端队列中,所以他们的关系就是图中一层包一层的样子

当batch满足某个条件或者消息等待指定时间之后,sender线程被拉起,Sender程序将不断从缓冲区取出数据,进而进入到另外一个阶段

新的优化点:指定等待时间

从缓冲区拉取出来的数据会被封装为Request对象,并且与缓存区类似的是: - NetworkClient中同样会存在一个类似缓冲区的存在:InFlightRequests。其中会按照分区对Request进行存储。所以他们的逻辑关系其实是这样的

2023-03-18-22-05-53

随后进行发送,而在默认情况下,如果Broker端一直没有响应,每个分区下的Request只能存放5个请求。而超出的情况将会阻塞发送逻辑。

消息发送成功后,将会清空原始数据。否则尝试重试等操作

Producer高吞吐

注意: 本段属于测试阶段,多图模式

前置条件

来吧,啰嗦完Producer发送过程之后,就到了精彩的测试验证环节。有句话需要重点说明下: - 以下验证结果不属银弹,实际生产中参数配置如何: 还是需要结合实际业务场景和资源配置给出最佳参数

机器配置: - 3台 2C4G 的虚拟机 - Topic设置分区数为3,副本因子为2

测试脚本

本次Producer吞吐量测试脚本在Kafka中已经提供,我们在${KAFKA_HOME}/kafka_2.13-3.3.1/bin中可以找到kafka-producer-perf-test.sh。如下对执行参数进行简单说明:

  • --throughput: 限制测试发送消息的最大吞吐量,-1表示不受限制
  • --num-records: 测试消息的数据量。
  • --record-size: 每条消息的大小。
  • --producer-props: Producer可配置参数信息,我们也可以通过properties文件的方式配置到--producer.config

默认参数负载情况

kafka-producer-perf-test.sh内部也是默认参数,如果机器配置不错可以适当调整JVM参数

bash kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 --throughput -1 --num-records 100000 --record-size 1024 发送10W条消息大小为1KB的消息到newTopic_test001上,不设置吞吐限制

text 30421 records sent, 6083.0 records/sec (5.94 MB/sec), 2125.8 ms avg latency, 3695.0 ms max latency. 55005 records sent, 11001.0 records/sec (10.74 MB/sec), 3129.7 ms avg latency, 4864.0 ms max latency. 100000 records sent, 9042.408898 records/sec (8.83 MB/sec), 2719.28 ms avg latency, 4864.00 ms max latency, 2598 ms 50th, 4250 ms 95th, 4734 ms 99th, 4832 ms 99.9th. 输出表示: - 成功消费100000记录,吞吐量:10101.010101条/s (9.86 MB/sec) - 平均延迟2451.60ms,最大延迟4025.00ms - 50%消息延迟2455ms, 95%消息延迟3581ms, 99%消息延迟3875ms, 99.9%消息延迟4016ms。

参数介绍

下面要介绍的这几个参数,在Producer端:任意一个都可以称为王牌的存在。接下来我们就一一来看看:

这些参数在ProducerConfig中都有介绍,我们一一来看~

batch.size

每当发送多个消息时,为了提高客户端和服务器的性能,生产者将尝试对多个消息进行打包成批,保证这一批可以在同一个分区内。默认为16384【16KB】

为了尽可能的提高吞吐量,在实际生产中需要对发送的消息进行合理预估,根据实际情况选择一个合理的大小,避免出现如下情况: - 单条消息超过batch.size,Producer有可能不会处理此消息 - batch.size过大,有可能会造成Producer端内存空间的浪费 - batch.size过小,频繁的网络IO会降低Producer的吞吐

linger.ms

如果消息迟迟没有达到batch.size,那么将尝试等待linger.ms时间发送。默认等待时间为0,也就是当消息到达之后立即发送

但实际上为了减少发送的请求数量,在没有负载的情况下也会延迟5ms的时间。所以这也不是绝对的立即发送~

这两个参数我们或多或少都介绍过,但是并没有测试调整它们对吞吐量的影响。接下来我们就先来测试一波吧!!!

测试batch.sizelinger.ms对吞吐量的影响

对照以上默认测试结果,然后我们开始进行参数调整。

跟着我的节奏,先对单个参数进行调整,对比差异

  • batch.size=32768

bash kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=32768 --throughput -1 --num-records 100000 --record-size 1024

三次输出结果显示

```text 68697 records sent, 13733.9 records/sec (13.41 MB/sec), 900.0 ms avg latency, 1812.0 ms max latency. 100000 records sent, 14817.009927 records/sec (14.47 MB/sec), 1129.92 ms avg latency, 1957.00 ms max latency, 1167 ms 50th, 1884 ms 95th, 1932 ms 99th, 1951 ms 99.9th.

100000 records sent, 22311.468095 records/sec (21.79 MB/sec), 1036.42 ms avg latency, 1467.00 ms max latency, 1100 ms 50th, 1431 ms 95th, 1449 ms 99th, 1462 ms 99.9th.

100000 records sent, 21753.317381 records/sec (21.24 MB/sec), 1001.94 ms avg latency, 1646.00 ms max latency, 955 ms 50th, 1614 ms 95th, 1631 ms 99th, 1639 ms 99.9th. ```

对比默认结果,我们可以看到在吞吐量上已经有了非常明显的提高

  • linger.ms=3000

bash kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 --throughput -1 --num-records 100000 --record-size 1024

三次输出结果显示

```text 60526 records sent, 12102.8 records/sec (11.82 MB/sec), 1632.5 ms avg latency, 2337.0 ms max latency. 100000 records sent, 12280.486307 records/sec (11.99 MB/sec), 1882.64 ms avg latency, 2591.00 ms max latency, 2050 ms 50th, 2553 ms 95th, 2571 ms 99th, 2584 ms 99.9th.

64966 records sent, 12990.6 records/sec (12.69 MB/sec), 1556.1 ms avg latency, 2757.0 ms max latency. 100000 records sent, 13540.961408 records/sec (13.22 MB/sec), 1719.67 ms avg latency, 2757.00 ms max latency, 1474 ms 50th, 2701 ms 95th, 2730 ms 99th, 2744 ms 99.9th.

71776 records sent, 14355.2 records/sec (14.02 MB/sec), 1506.2 ms avg latency, 2176.0 ms max latency. 100000 records sent, 14320.492625 records/sec (13.98 MB/sec), 1577.49 ms avg latency, 2176.00 ms max latency, 1668 ms 50th, 2134 ms 95th, 2154 ms 99th, 2170 ms 99.9th. ```

同样还是要和默认测试结果进行对比,虽然吞吐量没有配置batch.size的效果差异,但也不能说本次调整不重要

  • 合并参数测试

bash kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 batch.size=32768 --throughput -1 --num-records 100000 --record-size 1024

三次输出结果显示

text 100000 records sent, 21739.130435 records/sec (21.23 MB/sec), 1007.46 ms avg latency, 1428.00 ms max latency, 995 ms 50th, 1400 ms 95th, 1411 ms 99th, 1424 ms 99.9th. 100000 records sent, 23041.474654 records/sec (22.50 MB/sec), 952.25 ms avg latency, 1334.00 ms max latency, 919 ms 50th, 1302 ms 95th, 1318 ms 99th, 1328 ms 99.9th. 100000 records sent, 24271.844660 records/sec (23.70 MB/sec), 916.17 ms avg latency, 1318.00 ms max latency, 912 ms 50th, 1278 ms 95th, 1309 ms 99th, 1314 ms 99.9th.

已经可以看出对比了吧。 我们继续~~~

compression.type

该参数对Producer生产的数据进行压缩,主要针对批数据压缩。默认是none【无压缩】,可以用来设置的值: - gzip - snappy - lz4 - zstd

接下来我们直接测试这几个压缩算法的性能吧,为了方便写个脚本

```bash

!/bin/bash

for i in gzip snappy lz4 zstd do for ((j=0; j<3; j++)) do echo "----$i方式的第$j次测试----"

    kafka-producer-perf-test.sh --topic newTopic_test002 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=65536 compression.type=$i --throughput -1 --num-records 100000  --record-size 1024

done

done ```

查看每个级别的三次输出

text ----gzip方式的第0次测试---- 100000 records sent, 25425.883549 records/sec (24.83 MB/sec), 136.45 ms avg latency, 744.00 ms max latency, 97 ms 50th, 451 ms 95th, 677 ms 99th, 742 ms 99.9th. ----gzip方式的第1次测试---- 100000 records sent, 27019.724399 records/sec (26.39 MB/sec), 54.22 ms avg latency, 291.00 ms max latency, 27 ms 50th, 174 ms 95th, 228 ms 99th, 288 ms 99.9th. ----gzip方式的第2次测试---- 100000 records sent, 27940.765577 records/sec (27.29 MB/sec), 21.55 ms avg latency, 267.00 ms max latency, 11 ms 50th, 75 ms 95th, 97 ms 99th, 130 ms 99.9th. ===========分割线============= ----snappy方式的第0次测试---- 100000 records sent, 34153.005464 records/sec (33.35 MB/sec), 518.28 ms avg latency, 1047.00 ms max latency, 479 ms 50th, 969 ms 95th, 1029 ms 99th, 1044 ms 99.9th. ----snappy方式的第1次测试---- 100000 records sent, 32765.399738 records/sec (32.00 MB/sec), 474.20 ms avg latency, 985.00 ms max latency, 476 ms 50th, 921 ms 95th, 971 ms 99th, 984 ms 99.9th. ----snappy方式的第2次测试---- 100000 records sent, 34578.146611 records/sec (33.77 MB/sec), 450.48 ms avg latency, 932.00 ms max latency, 442 ms 50th, 869 ms 95th, 911 ms 99th, 928 ms 99.9th. ===========分割线============= ----lz4方式的第0次测试---- 100000 records sent, 34059.945504 records/sec (33.26 MB/sec), 474.75 ms avg latency, 871.00 ms max latency, 461 ms 50th, 818 ms 95th, 852 ms 99th, 867 ms 99.9th. ----lz4方式的第1次测试---- 100000 records sent, 37174.721190 records/sec (36.30 MB/sec), 408.28 ms avg latency, 769.00 ms max latency, 418 ms 50th, 716 ms 95th, 753 ms 99th, 767 ms 99.9th. ----lz4方式的第2次测试---- 100000 records sent, 32404.406999 records/sec (31.64 MB/sec), 483.38 ms avg latency, 1145.00 ms max latency, 372 ms 50th, 1077 ms 95th, 1122 ms 99th, 1142 ms 99.9th. ===========分割线============= ----zstd方式的第0次测试---- 100000 records sent, 51975.051975 records/sec (50.76 MB/sec), 57.53 ms avg latency, 279.00 ms max latency, 54 ms 50th, 108 ms 95th, 141 ms 99th, 158 ms 99.9th. ----zstd方式的第1次测试---- 100000 records sent, 47755.491882 records/sec (46.64 MB/sec), 66.12 ms avg latency, 333.00 ms max latency, 49 ms 50th, 208 ms 95th, 316 ms 99th, 332 ms 99.9th. ----zstd方式的第2次测试---- 100000 records sent, 51203.277010 records/sec (50.00 MB/sec), 60.27 ms avg latency, 265.00 ms max latency, 49 ms 50th, 148 ms 95th, 172 ms 99th, 193 ms 99.9th. ===========分割线=============

根据本次测试,而且测试命令也运行了多次,确实是zstd较好。 而我们再看看官网给出的说明: 2023-03-18-22-04-33

留个坑,下来我重新试试~~~

acks

前面章节中其实我们介绍过acks这个参数。

我们再啰嗦一下acks的可设置值

  • "0": 当消息调用send()发送出去之后就表示消息已经发送成功,不管消息是否已经到达broker
  • "1": 消息发送后,Leader接收到消息并记录到本地之后,不需要同步数据到副本就能进行ack返回
  • "all": 当消息在Leader接收记录,并且等待副本数据同步完成之后,才会返回ack。 该级别也属于Java#Producer的默认配置

接下来我们针对以上三种级别来做一个效果验证

基本就是默认配置,只不过多了一个acks的配置项 ```bash

!/bin/bash

for i in 0 1 all do echo "---------------acks=$i------------------" for ((j=0; j<3; j++)) do kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 acks=$i --throughput -1 --num-records 100000 --record-size 1024 done done ```

查看每个级别的三次输出

有数据有真相 ```text --------------acks=0------------------ 100000 records sent, 43878.894252 records/sec (42.85 MB/sec), 60.90 ms avg latency, 367.00 ms max latency, 20 ms 50th, 218 ms 95th, 297 ms 99th, 363 ms 99.9th. 100000 records sent, 38699.690402 records/sec (37.79 MB/sec), 121.56 ms avg latency, 823.00 ms max latency, 60 ms 50th, 469 ms 95th, 736 ms 99th, 814 ms 99.9th. 100000 records sent, 37650.602410 records/sec (36.77 MB/sec), 127.71 ms avg latency, 441.00 ms max latency, 100 ms 50th, 351 ms 95th, 403 ms 99th, 440 ms 99.9th. ---------------acks=1------------------ 100000 records sent, 30284.675954 records/sec (29.57 MB/sec), 387.87 ms avg latency, 897.00 ms max latency, 340 ms 50th, 799 ms 95th, 855 ms 99th, 893 ms 99.9th. 100000 records sent, 36995.930448 records/sec (36.13 MB/sec), 247.35 ms avg latency, 826.00 ms max latency, 199 ms 50th, 703 ms 95th, 789 ms 99th, 813 ms 99.9th. 100000 records sent, 37425.149701 records/sec (36.55 MB/sec), 394.01 ms avg latency, 850.00 ms max latency, 389 ms 50th, 784 ms 95th, 814 ms 99th, 841 ms 99.9th. ---------------acks=all------------------ 76362 records sent, 15272.4 records/sec (14.91 MB/sec), 1422.4 ms avg latency, 1980.0 ms max latency. 100000 records sent, 16084.928422 records/sec (15.71 MB/sec), 1463.71 ms avg latency, 1980.00 ms max latency, 1628 ms 50th, 1903 ms 95th, 1960 ms 99th, 1974 ms 99.9th.

81151 records sent, 16223.7 records/sec (15.84 MB/sec), 1404.0 ms avg latency, 2201.0 ms max latency. 100000 records sent, 16906.170752 records/sec (16.51 MB/sec), 1397.82 ms avg latency, 2201.00 ms max latency, 1491 ms 50th, 2001 ms 95th, 2136 ms 99th, 2193 ms 99.9th.

88156 records sent, 17631.2 records/sec (17.22 MB/sec), 1245.0 ms avg latency, 1688.0 ms max latency. 100000 records sent, 17969.451932 records/sec (17.55 MB/sec), 1255.73 ms avg latency, 1688.00 ms max latency, 1403 ms 50th, 1598 ms 95th, 1635 ms 99th, 1668 ms 99.9th. ```

所以: - 如果是类似日志、行为等不重要的消息,建议将acks设置为0. - 其他的就根据消息的安全程度来进行合理的选择吧~

下期预告

本期针对Producer调优参数的介绍和测试对比到这里就已经结束了。还是一句话: - 从数据看结果:实际生产中进行测试,选择合理的配置参数信息是必须得~

下一期针对数据可靠性我们来做一个详细的介绍。 期待~