Kafka 消费者组位移重设的几种方式

语言: CN / TW / HK

这是我参与11月更文挑战的第25天,活动详情查看:2021最后一次更文挑战

相关:Kafka 中的消费者组位移主题:Kafka 的消费者组是怎么保存消费位移的?

之前介绍过,Kafka 的消费者可以手动提交位移,并且可以提交并非当前位置的位移,这样可以实现跳过或者重新消费消息。这主要是因为 Kafka 是一个基于日志结构的消息系统,而并非「队列」结构。

简而言之,位移数据是消费者控制的。要注意的是,消费者只能控制位移,不能控制消息,消息对于消费者来说,永远都是只读的。

实际上,Kafka 为消费者提供了丰富的重设位移的方式,大致可以分为针对位置重设和根据时间重设。

根据位置重设

根据位置的位移重设有以下几种

Earliest

把位移重设到当前最早位移处。

这里要注意,因为 Kafka 会删除较早的日志,因此,最早的位置不一定是 0。如果你想重新消费主题中现有的所有消息,可以使用这个策略。

Latest

把位移重设到当前最新位移处。

如果你想跳过所有的历史消息,从最新的消息开始消费,那么就是用这个策略。

Current

把位移重设到当前最新提交的位移处。这个策略的使用场景不多。

Specified-Offset

把位移重设到一个指定的位移处。

有时候,消费者会从消息系统中拉取到无法消费的消息,比如消息的格式错误,或者消费过程中报错,再或者某些与业务相关的原因,导致消息不能消费。此时,可以使用这种策略跳过,消费它之后的消息。

Shift-By-N

把位移重设到一个与当前位置相对的位置上(当前位置 + N)。

Specified-Offset 可以直接指定要重设的位移位置,而 Shift-By-N 可以指定相对于当前位置的位移。比如 N 是 5 的时候,相当于跳过 5 个消息,这里的 N 也可以是负数,这样就会向回跳。

Specified-Offset 和 Shift-By-N 可以理解为绝对位置相对位置

根据时间重设

根据时间的位移重设有两种

DateTime

把位移重设为指定时间之后第一个位置。

Duration

把位移重设为与当前时间相对的一个时间点之后的第一个位置。

DateTime 和 Duration 也可以理解为绝对时间相对时间

如何操作

了解了这些策略,下面介绍一下具体怎么操作。

比如,要将消费者组的位移重设到当前最早的位置,可以使用一下命令:

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-earliest –execute

Latest 和 Current 策略与之类似。

Specified-Offset 和 Shift-By-N 则需要在命令中提供具体的值,格式分别是 --to-offset <offset>--shift-by <offset_N>

对于 DateTime 策略的位移重设,需要提供一个具体的时间:

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-datetime 2021-11-25T20:00:00.000 –execute

对于 Duration 策略,需要提供一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --by-duration PT0H30M0S –execute

这里的 PT0H30M0S 代表 30 分钟。

以上这些命令在执行之后,命令行都会提示新的位移信息。

如果要在消费者程序中重设位移,Kafka 也提供了相应的消费者 API,一下是 Java API:

java void seek(TopicPartition partition, long offset); void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata); void seekToBeginning(Collection<TopicPartition> partitions); void seekToEnd(Collection<TopicPartition> partitions);