Kafka 消費者組位移重設的幾種方式
這是我參與11月更文挑戰的第25天,活動詳情檢視:2021最後一次更文挑戰
相關:Kafka 中的消費者組 | 位移主題:Kafka 的消費者組是怎麼儲存消費位移的?
之前介紹過,Kafka 的消費者可以手動提交位移,並且可以提交併非當前位置的位移,這樣可以實現跳過或者重新消費訊息。這主要是因為 Kafka 是一個基於日誌結構的訊息系統,而並非「佇列」結構。
簡而言之,位移資料是消費者控制的。要注意的是,消費者只能控制位移,不能控制訊息,訊息對於消費者來說,永遠都是隻讀的。
實際上,Kafka 為消費者提供了豐富的重設位移的方式,大致可以分為針對位置重設和根據時間重設。
根據位置重設
根據位置的位移重設有以下幾種
Earliest
把位移重設到當前最早位移處。
這裡要注意,因為 Kafka 會刪除較早的日誌,因此,最早的位置不一定是 0。如果你想重新消費主題中現有的所有訊息,可以使用這個策略。
Latest
把位移重設到當前最新位移處。
如果你想跳過所有的歷史訊息,從最新的訊息開始消費,那麼就是用這個策略。
Current
把位移重設到當前最新提交的位移處。這個策略的使用場景不多。
Specified-Offset
把位移重設到一個指定的位移處。
有時候,消費者會從訊息系統中拉取到無法消費的訊息,比如訊息的格式錯誤,或者消費過程中報錯,再或者某些與業務相關的原因,導致訊息不能消費。此時,可以使用這種策略跳過,消費它之後的訊息。
Shift-By-N
把位移重設到一個與當前位置相對的位置上(當前位置 + N)。
Specified-Offset 可以直接指定要重設的位移位置,而 Shift-By-N 可以指定相對於當前位置的位移。比如 N 是 5 的時候,相當於跳過 5 個訊息,這裡的 N 也可以是負數,這樣就會向回跳。
Specified-Offset 和 Shift-By-N 可以理解為絕對位置和相對位置。
根據時間重設
根據時間的位移重設有兩種
DateTime
把位移重設為指定時間之後第一個位置。
Duration
把位移重設為與當前時間相對的一個時間點之後的第一個位置。
DateTime 和 Duration 也可以理解為絕對時間和相對時間。
如何操作
瞭解了這些策略,下面介紹一下具體怎麼操作。
比如,要將消費者組的位移重設到當前最早的位置,可以使用一下命令:
bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-earliest –execute
Latest 和 Current 策略與之類似。
Specified-Offset 和 Shift-By-N 則需要在命令中提供具體的值,格式分別是 --to-offset <offset>
和 --shift-by <offset_N>
。
對於 DateTime 策略的位移重設,需要提供一個具體的時間:
bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --to-datetime 2021-11-25T20:00:00.000 –execute
對於 Duration 策略,需要提供一個符合 ISO-8601 規範的 Duration 格式,以字母 P 開頭,後面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒。
bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics --by-duration PT0H30M0S –execute
這裡的 PT0H30M0S
代表 30 分鐘。
以上這些命令在執行之後,命令列都會提示新的位移資訊。
如果要在消費者程式中重設位移,Kafka 也提供了相應的消費者 API,一下是 Java API:
java
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
- Spring 原始碼閱讀 42:AutowiredAnnotationBeanPostProcessor 分析(3)
- Spring 原始碼閱讀 41:AutowiredAnnotationBeanPostProcessor 分析(2)
- Kafka 消費者組 Rebalance 詳細過程
- Spring 原始碼閱讀 01:Resource 資源抽象
- 初識機器學習:迴歸分析
- 初識機器學習:Louvain 社群發現演算法
- 初識機器學習:關聯規則
- 使用 Redis 實現分散式鎖的方法
- Kafka 目錄裡的指令碼那麼多,它們都是用來幹什麼的?
- Kafka 消費者組位移重設的幾種方式
- LeetCode - 84. 柱狀圖中最大的矩形
- LeetCode - 22. 括號生成