KAFKA容错及高可用测试二

语言: CN / TW / HK

环境准备

  • 测试前提

    通过前面的测试我们已经知道当acks=0或者acks=1时都有可能导致写入kafka的数据丢失

    如果发生unclean的leader切换也必然会导致数据丢失,详情可见:https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html

    因此在后续的测试中我们统一设置:

    • acks: all

    • unclean.leader.election.enable: false(新版本缺省设置)

测试场景八:

场景设定:

测试当acks=all,unclean.leader.election.enable=false,min.insync.replicas=1,所有follower由于慢导致被踢出ISR,acks退化为1的情况下,如果leader失效会出现的场景

操作过程:

客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换,之后恢复

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

[email protected]:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 NORMAL
kafka3 dfe863b7190e UP 172.17.0.5 NORMAL
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

leader为1,故将节点2、3设置为慢节点,模拟节点2、3处理缓慢,leader节点会与2、3节点间出现随机的网络延迟。leader与zk的通讯正常
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka2 kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.2"
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
64 bytes from 172.17.0.2: icmp_seq=1 ttl=64 time=0.047 ms
64 bytes from 172.17.0.2: icmp_seq=2 ttl=64 time=0.050 ms
64 bytes from 172.17.0.2: icmp_seq=3 ttl=64 time=0.053 ms
64 bytes from 172.17.0.2: icmp_seq=4 ttl=64 time=0.137 ms
64 bytes from 172.17.0.2: icmp_seq=5 ttl=64 time=0.053 ms
64 bytes from 172.17.0.2: icmp_seq=6 ttl=64 time=0.055 ms
64 bytes from 172.17.0.2: icmp_seq=7 ttl=64 time=0.057 ms
^C
--- 172.17.0.2 ping statistics ---
7 packets transmitted, 7 received, 0% packet loss, time 6132ms
rtt min/avg/max/mdev = 0.047/0.064/0.137/0.030 ms
[email protected]:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.4"
PING 172.17.0.4 (172.17.0.4) 56(84) bytes of data.
64 bytes from 172.17.0.4: icmp_seq=1 ttl=64 time=0.047 ms
64 bytes from 172.17.0.4: icmp_seq=2 ttl=64 time=1727 ms
64 bytes from 172.17.0.4: icmp_seq=4 ttl=64 time=1206 ms
64 bytes from 172.17.0.4: icmp_seq=5 ttl=64 time=359 ms
64 bytes from 172.17.0.4: icmp_seq=3 ttl=64 time=3054 ms
64 bytes from 172.17.0.4: icmp_seq=6 ttl=64 time=1432 ms
64 bytes from 172.17.0.4: icmp_seq=8 ttl=64 time=0.059 ms
64 bytes from 172.17.0.4: icmp_seq=7 ttl=64 time=1639 ms
64 bytes from 172.17.0.4: icmp_seq=9 ttl=64 time=873 ms
^C
--- 172.17.0.4 ping statistics ---
10 packets transmitted, 9 received, 10% packet loss, time 9056ms
rtt min/avg/max/mdev = 0.047/1143.632/3054.774/920.712 ms, pipe 4
[email protected]:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.5"
PING 172.17.0.5 (172.17.0.5) 56(84) bytes of data.
64 bytes from 172.17.0.5: icmp_seq=1 ttl=64 time=507 ms
64 bytes from 172.17.0.5: icmp_seq=3 ttl=64 time=0.054 ms
64 bytes from 172.17.0.5: icmp_seq=4 ttl=64 time=45.6 ms
64 bytes from 172.17.0.5: icmp_seq=2 ttl=64 time=2179 ms
64 bytes from 172.17.0.5: icmp_seq=5 ttl=64 time=0.058 ms
64 bytes from 172.17.0.5: icmp_seq=6 ttl=64 time=824 ms
64 bytes from 172.17.0.5: icmp_seq=7 ttl=64 time=17.7 ms
64 bytes from 172.17.0.5: icmp_seq=8 ttl=64 time=0.051 ms
64 bytes from 172.17.0.5: icmp_seq=9 ttl=64 time=215 ms
^C
--- 172.17.0.5 ping statistics ---
10 packets transmitted, 9 received, 10% packet loss, time 9066ms
rtt min/avg/max/mdev = 0.051/421.125/2179.973/678.343 ms, pipe 3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 SLOW
kafka3 dfe863b7190e UP 172.17.0.5 SLOW
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL

观察当前的ISR状态,直到缩减为leader为止,期间客户端持续写入数据,直到出错退出
[email protected]:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka1 test1
...
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
^C
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1

在ISR缩减为leader以后,这时客户端仍然可以正常写入数据,之后模拟leader宕机。
leader失效后,由于无法进行unclean的切换,无法进行正常的leader切换。新的leader无法选举出来,数据无法写入,也无法读取。leader显示为none,ISR维持原样,Producer端hung住一段时间后超时失败;
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Error: No such container: kafka-topics
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
[2021-12-04 02:12:32,587] WARN [AdminClient clientId=adminclient-1] Connection to node 1 (/172.17.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: none Replicas: 1,2,3 Isr: 1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: none Replicas: 1,2,3 Isr: 1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-controller.sh kafka2 test1
2

重新启动恢复原leader
原leader恢复后,已写入数据可以读取,数据没有丢失;如果leader不能恢复,则所有数据丢失
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
update-hosts.sh
[email protected]:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 SLOW
kafka3 dfe863b7190e UP 172.17.0.5 SLOW
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:164834

恢复follower的网络。等所有follower网络恢复正常后,从leader同步数据,ISR恢复正常;
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka2 kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 NORMAL
kafka3 dfe863b7190e UP 172.17.0.5 NORMAL
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

再次模拟leader失效,这时可以进行正常leader切换,出现少量写入错误,leader切换完成后,写入正常,ISR去除失效节点,数据没有丢失;当前leader切换为2
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b DOWN UNKNOWN
kafka2 42d93122f5c8 UP 172.17.0.4 NORMAL
kafka3 dfe863b7190e UP 172.17.0.5 NORMAL
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3

原失效leader恢复后,能够自动加入ISR,从新leader同步数据
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
update-hosts.sh
[email protected]:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 NORMAL
kafka3 dfe863b7190e UP 172.17.0.5 NORMAL
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3,1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:1164784

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

在节点2、3出现网络缓慢时的数据写入,后面超时退出
[email protected]:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60755ms, timeout #0)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60750ms, timeout #1)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60744ms, timeout #2)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60739ms, timeout #3)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60733ms, timeout #4)
%4|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 125 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
%3|1638583780.169|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 125 request(s) timed out: disconnect (after 77079ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
... ...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 147662
Delivered: 37504
Failed: 10158
%4|1638583794.349|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (547663 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

在ISR缩减为leader时,模拟leader宕机时的数据接入,leader失效后不能再写入数据
[email protected]:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
%4|1638583911.382|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected (after 75954ms in state UP)
%3|1638583911.382|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: Connection refused (after 0ms in state CONNECT)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%3|1638583978.567|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: No route to host (after 66977ms in state CONNECT)
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 207350
Delivered: 107307
Failed: 43
%4|1638583985.412|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (600000 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

在所有网络恢复正常后,ISR恢复为3个节点后,模拟leader宕机,发生正常leader切换,期间只发生少量数据发送失败,没有发生数据丢失
[email protected]:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%4|1638584478.892|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected (after 21710ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%6|1638584478.893|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
Success: 49935 Failed: 65
Success: 59935 Failed: 65
Success: 69935 Failed: 65
Success: 79935 Failed: 65
Success: 89935 Failed: 65
Success: 99935 Failed: 65
Success: 109935 Failed: 65
Success: 119935 Failed: 65
Success: 129935 Failed: 65
Success: 139935 Failed: 65
Success: 149935 Failed: 65
Success: 159935 Failed: 65
Success: 169935 Failed: 65
%3|1638584547.911|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: No route to host (after 68719ms in state CONNECT)
Success: 179935 Failed: 65
Success: 189935 Failed: 65
Success: 199935 Failed: 65
Success: 209935 Failed: 65
Success: 219935 Failed: 65
Success: 229935 Failed: 65
Success: 239935 Failed: 65
Success: 249935 Failed: 65
Success: 259935 Failed: 65
Success: 269935 Failed: 65
Success: 279935 Failed: 65
Success: 289935 Failed: 65
Success: 299935 Failed: 65
Success: 309935 Failed: 65
Success: 319935 Failed: 65
Success: 329935 Failed: 65
Success: 339935 Failed: 65
Success: 349935 Failed: 65
Success: 359935 Failed: 65
Success: 369935 Failed: 65
Success: 379935 Failed: 65
Success: 389935 Failed: 65
Success: 399935 Failed: 65
Success: 409935 Failed: 65
Success: 419935 Failed: 65
Success: 429935 Failed: 65
Success: 439935 Failed: 65
Success: 449935 Failed: 65
Success: 459935 Failed: 65
Success: 469935 Failed: 65
Success: 479935 Failed: 65
Success: 489935 Failed: 65
Success: 499935 Failed: 65
Success: 509935 Failed: 65
Success: 519935 Failed: 65
Success: 529935 Failed: 65
Success: 539935 Failed: 65
Success: 549935 Failed: 65
Success: 559935 Failed: 65
Success: 569935 Failed: 65
Success: 579935 Failed: 65
Success: 589935 Failed: 65
Success: 599935 Failed: 65
Success: 609935 Failed: 65
Success: 619935 Failed: 65
Success: 629935 Failed: 65
Success: 639935 Failed: 65
Success: 649935 Failed: 65
Success: 659935 Failed: 65
Success: 669935 Failed: 65
Success: 679935 Failed: 65
Success: 689935 Failed: 65
Success: 699935 Failed: 65
Success: 709935 Failed: 65
Success: 719935 Failed: 65
Success: 729935 Failed: 65
Success: 739935 Failed: 65
Success: 749935 Failed: 65
Success: 759935 Failed: 65
Success: 769935 Failed: 65
Success: 779935 Failed: 65
Success: 789935 Failed: 65
Success: 799935 Failed: 65
Success: 809935 Failed: 65
Success: 819935 Failed: 65
Success: 829935 Failed: 65
Success: 839935 Failed: 65
Success: 849935 Failed: 65
Success: 859935 Failed: 65
Success: 869935 Failed: 65
Success: 879935 Failed: 65
Success: 889935 Failed: 65
Success: 899935 Failed: 65
Success: 909935 Failed: 65
Success: 919935 Failed: 65
Success: 929935 Failed: 65
Success: 939935 Failed: 65
Success: 949935 Failed: 65
Success: 959935 Failed: 65
Success: 969935 Failed: 65
Success: 979935 Failed: 65
Success: 989935 Failed: 65
Success: 999935 Failed: 65
Sent: 1000000
Delivered: 999935
Failed: 65
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    当acks=all,unclean.leader.election.enable=false,min.insync.replicas=1,所有follower由于慢导致被踢出ISR,ISR缩减为leader,acks退化为1。acks在退化为1之前客户端写入失败,之后可以正常写入数据。

    这时如果leader失效,由于设置unclean.leader.election.enable=false,故不会发生leader切换,数据将不能进行读写操作。

    这时只能等待原leader能够恢复。如果原leader不能恢复将导致所有数据丢失。

    原leader恢复以后可以正常读写数据,当网络慢的问题解决以后,leader再失效,可以发生正常切换,在原leader恢复后,也可以重新加入集群,不会发生数据丢失。

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

测试场景九:

场景设定:

在场景八的基础上设置min.insync.replicas=2,确保在写入数据时至少有两个broker节点的数据是一致的

操作过程:

客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换,之后恢复

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

[email protected]:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic-min-insync.sh kafka1 test1 2
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 NORMAL
kafka2 7893d611b79b UP 172.17.0.4 NORMAL
kafka3 4dfd87691f46 UP 172.17.0.5 NORMAL
zk1 06b129f53213 UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1

当前leader为3,模拟节点1、2出现网络慢的故障,导致ISR缩减为leader
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka1 kafka2
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 SLOW
kafka2 7893d611b79b UP 172.17.0.4 SLOW
kafka3 4dfd87691f46 UP 172.17.0.5 NORMAL
zk1 06b129f53213 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka3 test1
...
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
^C

leader没有失效的时候,ISR缩减为只有leader,写入操作由于不满足min.insync.replicas=2而失败,出现错误code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"。之后模拟leader节点出现宕机,
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 SLOW
kafka2 7893d611b79b UP 172.17.0.4 SLOW
kafka3 4dfd87691f46 DOWN UNKNOWN
zk1 06b129f53213 UP 172.17.0.2 NORMAL

leader失效后,新的leader无法选举出来,数据无法写入,也无法读取,ISR维持原样,Producer端hung住一段时间后超时失败;恢复原leader
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 SLOW
kafka2 7893d611b79b UP 172.17.0.4 SLOW
kafka3 4dfd87691f46 UP 172.17.0.5 NORMAL
zk1 06b129f53213 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3

原leader恢复后,已写入数据恢复,数据没有丢失;如果leader不能恢复,所有数据丢失
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:59063

所有follower网络恢复正常后,从leader同步数据,ISR恢复正常;由于ISR里面的节点数据超过2,因此可以正常写入数据
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka1 kafka2
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 NORMAL
kafka2 7893d611b79b UP 172.17.0.4 NORMAL
kafka3 4dfd87691f46 UP 172.17.0.5 NORMAL
zk1 06b129f53213 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1

这时leader再失效,进行leader切换,出现少量写入错误,leader切换完成后,写入正常,ISR去除失效节点,数据没有丢失;
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 NORMAL
kafka2 7893d611b79b UP 172.17.0.4 NORMAL
kafka3 4dfd87691f46 DOWN UNKNOWN
zk1 06b129f53213 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 2 Replicas: 3,2,1 Isr: 2,1

切换后的leader再失效,ISR缩减为只有leader,写入操作数据由于不满足min.insync.replicas=2导致写入失败,出现错误code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"。这时只能读取数据
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka2
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 NORMAL
kafka2 7893d611b79b DOWN UNKNOWN
kafka3 4dfd87691f46 DOWN UNKNOWN
zk1 06b129f53213 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:209578

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

[email protected]:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60988ms, timeout #0)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60981ms, timeout #1)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60975ms, timeout #2)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60969ms, timeout #3)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60963ms, timeout #4)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
...
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
...
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
^CTraceback (most recent call last):
File "producer.py", line 68, in <module>
sleep(wait_period)
KeyboardInterrupt
Sent: 9566
Delivered: 0
Failed: 9556
%4|1638601352.387|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 10 messages (40 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
[email protected]:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%3|1638601448.775|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/bootstrap: Connect to ipv4#172.17.0.5:9092 failed: No route to host (after 18562ms in state CONNECT)
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 100000
Delivered: 0
Failed: 0
%4|1638601500.477|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (488895 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
[email protected]:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%4|1638601740.244|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected (after 22236ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
...
%6|1638601740.246|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY)
Success: 49949 Failed: 51
Success: 59949 Failed: 51
Success: 69949 Failed: 51
Success: 79949 Failed: 51
Success: 89949 Failed: 51
Success: 99949 Failed: 51
Success: 109949 Failed: 51
Success: 119949 Failed: 51
Success: 129949 Failed: 51
Success: 139949 Failed: 51
Success: 149949 Failed: 51
%4|1638601791.642|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Disconnected (after 31598ms in state UP)
%3|1638601791.643|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Connect to ipv4#172.17.0.4:9092 failed: Connection refused (after 0ms in state CONNECT)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
...
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
^CTraceback (most recent call last):
File "producer.py", line 68, in <module>
sleep(wait_period)
KeyboardInterrupt
Sent: 282974
Delivered: 150515
Failed: 132456
%4|1638601861.504|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 3 messages (18 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    这个场景与场景八类似,区别在于ISR里面broker节点数在低于2时不能正常写入数据,只能读取数据

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

测试场景十:

场景设定:

在场景九的基础上,如果原leader磁盘出现故障不能恢复,在丢失全部数据的情况下重新加入集群

操作过程:

客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换。在原leader恢复的过程中模拟出现了不可恢复的磁盘故障,导致原leader上的数据全部丢失

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

[email protected]:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic-min-insync.sh kafka1 test1 2
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 UP 172.17.0.3 NORMAL
kafka2 931bd63b61e2 UP 172.17.0.4 NORMAL
kafka3 955172d1a496 UP 172.17.0.5 NORMAL
zk1 94404576cf08 UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

当前leader为1,kafka集群这时已经写入了大量数据
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:109498

模拟follower节点出现磁盘缓慢的故障
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka2 kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 UP 172.17.0.3 NORMAL
kafka2 931bd63b61e2 UP 172.17.0.4 SLOW
kafka3 955172d1a496 UP 172.17.0.5 SLOW
zk1 94404576cf08 UP 172.17.0.2 NORMAL

最终ISR缩减为只包含leader节点
[email protected]:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
...
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
^C

这时在leader节点上是已经保存了大量数据的
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:159391

模拟leader节点出现宕机故障,虽然follower节点上也保存了部分数据,但是不可用
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 DOWN UNKNOWN
kafka2 931bd63b61e2 UP 172.17.0.4 SLOW
kafka3 955172d1a496 UP 172.17.0.5 SLOW
zk1 94404576cf08 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
[2021-12-04 08:53:54,124] WARN [AdminClient clientId=adminclient-1] Connection to node 1 (/172.17.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: none Replicas: 1,2,3 Isr: 1

模拟原leader节点上出现了不可恢复的磁盘故障,所有数据丢失
[email protected]:~/ChaosTestingCode/Kafka/cluster$ sudo rm -rf volumes/kafka/01/data/
[email protected]:~/ChaosTestingCode/Kafka/cluster$ mkdir -p volumes/kafka/01/data/

启动原leader节点
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 UP 172.17.0.3 NORMAL
kafka2 931bd63b61e2 UP 172.17.0.4 SLOW
kafka3 955172d1a496 UP 172.17.0.5 SLOW
zk1 94404576cf08 UP 172.17.0.2 NORMAL

原leader节点能够正常加入集群,但是集群原先保存的数据全部丢失
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:0
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka2 kafka3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:0
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
[email protected]:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 DOWN UNKNOWN
kafka2 931bd63b61e2 UP 172.17.0.4 NORMAL
kafka3 955172d1a496 UP 172.17.0.5 NORMAL
zk1 94404576cf08 UP 172.17.0.2 NORMAL
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3
[email protected]:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:0

原leader在丢失数据后,重启出现的日志:
由于不能找到以前的任何信息No checkpointed highwatermark is found for partition test1-0,因此Log loaded for partition test1-0 with initial high watermark 0然后将test1-0 starts at leader epoch 2 from offset 0 with high watermark 0

[2021-12-05 02:14:05,737] ERROR [Broker id=1] Received LeaderAndIsrRequest with correlation id 1 from controller 3 epoch 2 for partition test1-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:14:05,799] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-05 02:14:05,805] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 34 ms (kafka.log.Log)
[2021-12-05 02:14:05,810] INFO Created log for partition test1-0 in /var/lib/kafka/data/test1-0 with properties {min.insync.replicas=2} (kafka.log.LogManager)
[2021-12-05 02:14:05,811] INFO [Partition test1-0 broker=1] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,811] INFO [Partition test1-0 broker=1] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,811] ERROR [Broker id=1] Received LeaderAndIsrRequest with correlation id 1 from controller 3 epoch 2 for partition __confluent.support.metrics-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:14:05,815] INFO [Log partition=__confluent.support.metrics-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-05 02:14:05,815] INFO [Log partition=__confluent.support.metrics-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)
[2021-12-05 02:14:05,816] INFO Created log for partition __confluent.support.metrics-0 in /var/lib/kafka/data/__confluent.support.metrics-0 with properties {retention.ms=31536000000} (kafka.log.LogManager)
[2021-12-05 02:14:05,816] INFO [Partition __confluent.support.metrics-0 broker=1] No checkpointed highwatermark is found for partition __confluent.support.metrics-0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,816] INFO [Partition __confluent.support.metrics-0 broker=1] Log loaded for partition __confluent.support.metrics-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-05 02:14:10,185] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0, __confluent.support.metrics-0) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:10,191] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 2 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-05 02:14:10,200] INFO [Partition __confluent.support.metrics-0 broker=1] __confluent.support.metrics-0 starts at leader epoch 2 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-05 02:14:11,394] INFO [Partition test1-0 broker=1] Expanding ISR from 1 to 1,2 (kafka.cluster.Partition)
[2021-12-05 02:14:11,401] INFO [Partition test1-0 broker=1] ISR updated to [1,2] and zkVersion updated to [4] (kafka.cluster.Partition)
[2021-12-05 02:14:14,459] INFO [Partition test1-0 broker=1] Expanding ISR from 1,2 to 1,2,3 (kafka.cluster.Partition)
[2021-12-05 02:14:14,462] INFO [Partition test1-0 broker=1] ISR updated to [1,2,3] and zkVersion updated to [5] (kafka.cluster.Partition)

follower上出现的日志:
在出现UNKNOWN_LEADER_EPOCH错误后,将本地数据Truncating to offset 0

[2021-12-05 02:13:42,140] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test1-0=(fetchOffset=57962, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=115803198, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:45,821] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Connection to node 1 (kafka1/172.17.0.3:19092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-12-05 02:13:45,822] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=115803198, epoch=INITIAL) to node 1: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to kafka1:19092 (id: 1 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:103)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:45,822] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test1-0=(fetchOffset=57962, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=115803198, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to kafka1:19092 (id: 1 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:103)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:50,568] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:53,741] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:57,128] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:59,409] ERROR [Broker id=2] Received LeaderAndIsrRequest with correlation id 3 from controller 3 epoch 2 for partition test1-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:13:59,689] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:14:03,821] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:14:06,095] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:06,096] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=57962, leaderEpoch=2)) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:07,130] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test1-0 as the leader reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
[2021-12-05 02:14:10,372] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test1-0 as the leader reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
[2021-12-05 02:14:11,381] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to offset 0 (kafka.log.Log)
[2021-12-05 02:14:11,383] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Scheduling segments for deletion List() (kafka.log.Log)
[2021-12-05 02:14:11,390] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

[email protected]:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Success: 110000 Failed: 0
Success: 120000 Failed: 0
Success: 130000 Failed: 0
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 237429
Delivered: 137429
Failed: 0
%4|1638607813.104|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (600000 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
[email protected]:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%5|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60011ms, timeout #0)
%5|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60005ms, timeout #1)
%4|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
%3|1638607895.990|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 2 request(s) timed out: disconnect (after 60072ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 110645
Delivered: 1926
Failed: 8719
%4|1638607913.414|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (510646 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    当ISR由于某种原因缩减为leader自身后(节点缓慢、网络缓慢、节点宕机等),如果这时leader再出现故障宕机,同时leader上的磁盘出现不可恢复的故障,丢失原有数据后再重新加入集群,这样将导致整个topic的数据全部丢失,即使其他的follower上保留有部分的数据

    在早期的版本中,如果遇到这种情况,会导致整个集群崩溃。follower中会出现类似的日志:

    log from kafka2

    [2021-10-13 06:09:03,601] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 100955 (kafka.server.ReplicaFetcherThread)

    log from kafka3

    [2021-10-13 06:09:02,856] ERROR [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 100955 (kafka.server.ReplicaFetcherThread)

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

数据丢失的场景回顾

通过前面的测试,我们在这里总结一下会导致kafka丢失数据的情况:

  • 当acks=1时发生leader的故障切换

  • 即使在设置acks=all的情况下,如果允许unclean的故障切换(被选举为新leader的相关follower已经不在ISR列表中)

  • 当acks=1时leader与zookeeper发生网络分区,形成短暂脑裂

  • 在ISR已经缩减到leader自身的情况下,leader与其他kafka节点及zookeeper都发生隔离,即使将acks设置为all,原leader也会接收写入请求,导致数据丢失。特别是当min.insync.replicas=1的时候。尤其是这时如果leader发生了不可修复的磁盘故障导致,leader上的数据丢失,在空数据恢复leader后会最终导致整个topic的数据丢失

  • 数据分区的所有节点同时出现故障。由于数据仅仅在内存中写入即被确认,这些数据很有可能没有被写入到磁盘中。当数据节点恢复的时候将导致数据的丢失。

要避免上述情况,可以通过一些设置来一定程度上解决。Unclean的故障切换可以通过设置unclean.leader.election.enable=false来避免,或者是通过设置min.insync.replicas大于等于2确保数据冗余度至少是2来避免。在需要保证高可靠的环境中通常设置acks=all,同时min.insync.replicas大于1。但这样做的代价是:

  • 增加数据处理延迟,性能降低

  • 在ISR退化到只有leader的时候不能再写入数据,可用性降低

  • 由于不能切换到unclean的节点,因此如果leader出现故障无法修复,即使有其他节点拥有部分数据,仍然不可用,可用性降低

作者简介

詹玉林,中国民生银行信息科技部开源软件支持组工程师