Kafka 消費者組位移重設的幾種方式

語言: CN / TW / HK

這是我參與11月更文挑戰的第25天,活動詳情檢視:2021最後一次更文挑戰

相關:Kafka 中的消費者組位移主題:Kafka 的消費者組是怎麼儲存消費位移的?

之前介紹過,Kafka 的消費者可以手動提交位移,並且可以提交併非當前位置的位移,這樣可以實現跳過或者重新消費訊息。這主要是因為 Kafka 是一個基於日誌結構的訊息系統,而並非「佇列」結構。

簡而言之,位移資料是消費者控制的。要注意的是,消費者只能控制位移,不能控制訊息,訊息對於消費者來說,永遠都是隻讀的。

實際上,Kafka 為消費者提供了豐富的重設位移的方式,大致可以分為針對位置重設和根據時間重設。

根據位置重設

根據位置的位移重設有以下幾種

Earliest

把位移重設到當前最早位移處。

這裡要注意,因為 Kafka 會刪除較早的日誌,因此,最早的位置不一定是 0。如果你想重新消費主題中現有的所有訊息,可以使用這個策略。

Latest

把位移重設到當前最新位移處。

如果你想跳過所有的歷史訊息,從最新的訊息開始消費,那麼就是用這個策略。

Current

把位移重設到當前最新提交的位移處。這個策略的使用場景不多。

Specified-Offset

把位移重設到一個指定的位移處。

有時候,消費者會從訊息系統中拉取到無法消費的訊息,比如訊息的格式錯誤,或者消費過程中報錯,再或者某些與業務相關的原因,導致訊息不能消費。此時,可以使用這種策略跳過,消費它之後的訊息。

Shift-By-N

把位移重設到一個與當前位置相對的位置上(當前位置 + N)。

Specified-Offset 可以直接指定要重設的位移位置,而 Shift-By-N 可以指定相對於當前位置的位移。比如 N 是 5 的時候,相當於跳過 5 個訊息,這裡的 N 也可以是負數,這樣就會向回跳。

Specified-Offset 和 Shift-By-N 可以理解為絕對位置相對位置

根據時間重設

根據時間的位移重設有兩種

DateTime

把位移重設為指定時間之後第一個位置。

Duration

把位移重設為與當前時間相對的一個時間點之後的第一個位置。

DateTime 和 Duration 也可以理解為絕對時間相對時間

如何操作

瞭解了這些策略,下面介紹一下具體怎麼操作。

比如,要將消費者組的位移重設到當前最早的位置,可以使用一下命令:

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-earliest –execute

Latest 和 Current 策略與之類似。

Specified-Offset 和 Shift-By-N 則需要在命令中提供具體的值,格式分別是 --to-offset <offset>--shift-by <offset_N>

對於 DateTime 策略的位移重設,需要提供一個具體的時間:

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-datetime 2021-11-25T20:00:00.000 –execute

對於 Duration 策略,需要提供一個符合 ISO-8601 規範的 Duration 格式,以字母 P 開頭,後面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒。

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --by-duration PT0H30M0S –execute

這裡的 PT0H30M0S 代表 30 分鐘。

以上這些命令在執行之後,命令列都會提示新的位移資訊。

如果要在消費者程式中重設位移,Kafka 也提供了相應的消費者 API,一下是 Java API:

java void seek(TopicPartition partition, long offset); void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata); void seekToBeginning(Collection<TopicPartition> partitions); void seekToEnd(Collection<TopicPartition> partitions);