RocketMQ 5.0: POP 消費模式 原理詳解 & 原始碼解析
RocketMQ POP 消費模式 原理詳解 & 原始碼解析
原文地址:http://hscarb.github.io/rocketmq/20221212-rocketmq-consumer-7-pop-consume.md
1. 背景
1.1 什麼是 Pop 消費
RocketMQ 5.0 中引入了一種新的消費模式:Pop 消費模式。
我們知道 RocketMQ 原來有兩種消費模式:Pull 模式消費和 Push 模式消費,其中 Push 模式指的是 Broker 將訊息主動“推送”給消費者,它的背後其實是消費者在不斷地 Pull 訊息來實現類似於 Broker “推”訊息給消費者的效果。
新引入的 Pop 消費模式主要是用於 Push 消費時將拉訊息的動作替換成 Pop 。Pop 消費的行為和 Pull 消費很像,區別在於 Pop 消費的重平衡是在 Broker 端做的,而之前的 Pull 和 Push 消費都是由客戶端完成重平衡。
1.2 如何使用 Pop 消費
RocketMQ 提供了 2 種方式,能夠讓 Push 消費切換為使用 Pop 模式拉取訊息(Pull 消費暫不支援切換 Pop 模式),分別為命令列方式切換和客戶端程式碼方式切換。
1.2.1 使用命令列方式切換
利用命令列,用如下命令,指定叢集和需要切換的消費組,可以將一個消費組切換成 Pop 消費模式消費某個 Topic
mqadmin setConsumeMode -c cluster -t topic -g group -m POP -q 8
以下為引數含義
opt = new Option("c", "clusterName", true, "create subscription group to which cluster");
opt = new Option("t", "topicName", true, "topic name");
opt = new Option("g", "groupName", true, "consumer group name");
opt = new Option("m", "mode", true, "consume mode. PULL/POP");
opt = new Option("q", "popShareQueueNum", true, "num of queue which share in pop mode");
1.2.2 程式碼切換
在建立 Consumer 之前,先執行 switchPop()
方法,它其實與上面命令列的邏輯一樣,也是傳送請求給叢集中的所有 Broker 節點,讓它們切換對應消費者組和 Topic 的消費者的消費模式為 Pop 模式。
// PopPushConsumer.java
public class PopPushConsumer {
public static final String CONSUMER_GROUP = "CID_JODIE_1";
public static final String TOPIC = "TopicTest";
// Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
private static void switchPop() throws Exception {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.start();
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());
for (String brokerAddr : brokerAddrs) {
mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
}
}
public static void main(String[] args) throws Exception {
switchPop();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.subscribe(TOPIC, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.setClientRebalance(false);
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
1.3 引入 Pop 消費模式的原因
引入 Pop 消費主要的原因是由於 Push 消費的機制導致它存在一些痛點。RocketMQ 5.0 雲原生化的要求催生著一種能夠解決這些痛點的新消費模式誕生。
Push 消費模式的重平衡邏輯是在客戶端完成的,這就導致了幾個問題:
- 客戶端程式碼邏輯較重,要支援一種新語言的客戶端就必須實現完整的重平衡邏輯,此外還需要實現拉訊息、位點管理、消費失敗後將訊息發回 Broker 重試等邏輯。這給多語言客戶端的支援造成很大的阻礙。
- 當客戶端升級或者下線時,都要進行重平衡操作,可能造成訊息堆積。
此外,Push 消費的特性是重平衡後每個消費者都分配到消費一定數量的佇列,而每個佇列最多隻能被一個消費者消費。這就決定了消費者的橫向擴充套件能力受到 Topic 中佇列數量的限制。這裡有引入瞭如下痛點
- 消費者無法無限擴充套件,當消費者數量擴大到大於佇列數量時,有的消費者將無法分配到佇列。
- 當某些消費者僵死(hang 住)時(與 Broker 的心跳未斷,但是無法消費訊息),會造成其消費的佇列的訊息堆積,遲遲無法被消費,也不會主動重平衡來解決這個問題。
引入 Pop 消費模式之後,可以解決 Push 消費導致的可能的訊息堆積問題和橫向擴充套件能力問題。此外,RocketMQ 5.0 中引入了的輕量化客戶端就用到了 Pop 消費能力,將 Pop 消費介面用 gRPC 封裝,實現了多語言輕量化客戶端,而不必在客戶端實現重平衡邏輯。詳見該專案 rocketmq-clients。
2. 概要設計
Pop 消費主要的設計思想是將繁重的客戶端邏輯如重平衡、消費進度提交、消費失敗後發到 Broker 重試等邏輯放到 Broker 端。
客戶端只需要不斷髮送 Pop 請求,由 Broker 端來分配每次拉取請求要拉取的佇列並返回訊息。這樣就可以實現多個客戶端同時拉取一個佇列的效果,不會存在一個客戶端 hang 住導致佇列訊息堆積,也不會存在頻繁的重平衡導致訊息積壓。
2.1 Pop 消費流程
為了保證消費速度,Pop 消費一次請求可以拉取一批訊息,拉取到的訊息系統屬性中有一個比較重要的屬性叫做 POP_CK
,它是該訊息的控制代碼,ACK 時要通過控制代碼來定位到它。在 Broker 端會為這批訊息儲存一個 CheckPoint
,它裡面包含一批訊息的控制代碼資訊。
對於長時間沒有 ACK 的訊息,Broker 端並非毫無辦法。Pop 消費引入了訊息不可見時間(invisibleTime)的機制。當 Pop 出一條訊息後,這條訊息對所有消費者不可見,即進入不可見時間,當它超過該時刻還沒有被 ACK,Broker 將會把它放入 Pop 專門的重試 Topic(這個過程稱為 Revive),這條訊息重新可以被消費。
Push 消費的重試間隔時間會隨著重試次數而增加,Pop 消費也沿用了這個設計。此外,Pop 消費提供了一個介面 changeInvisibleTime()
來修改單條訊息的不可見時間。
從圖上可以看見,本來訊息會在中間這個時間點再一次的可見的,但是我們在可見之前提前使用 changeInvisibleTime
延長了不可見時間,讓這條訊息的可見時間推遲了。
當消費失敗(使用者業務程式碼返回 reconsumeLater 或者拋異常)的時候,消費者就通過 changeInvisibleTime
按照重試次數來修改下一次的可見時間。另外如果消費訊息用時超過了 30 秒(預設值,可以修改),則 Broker 也會把訊息放到重試佇列。
2.2 客戶端-服務端互動
Pop 消費的流程與 Push 消費較為相似,這裡我分為 5 個步驟。
- 向 Broker 端傳送請求,切換訊息拉取模式為 Pop 模式
- 重平衡服務執行重平衡,此時已經切換為 Pop 模式,所以是向 Broker 端發起請求,請求中帶有重平衡策略,Broker 會返回重平衡的結果。
- 重平衡完畢之後開始拉取訊息,拉取訊息服務傳送
POP_MESSAGE
請求給 Broker,獲取一批訊息 - 消費這批訊息
- 對成功消費的訊息,傳送 ACK 請求給 Broker
2.3 服務端實現
服務端收到 Pop 請求後,會先在 Queue 維度上加鎖,保證同一時間只有一個消費者可以拉取該佇列的訊息。
隨後服務端會在儲存中查詢一批訊息,將這批訊息的構建的 CheckPoint
儲存在 Broker 中,以便與 ACK 的訊息匹配。
CheckPoint
的存在目的是與 ACK 的訊息匹配,並將沒有匹配的訊息重試。CheckPoint
的 ReviveTime
就是它這批訊息需要被嘗試重試(喚醒)的時間。
CheckPoint
會先被儲存在記憶體中,一般來說訊息消費很快,所以在記憶體中就能夠與 ACK 訊息匹配成功後刪除。如果在一段時間(預設 3s)內沒有匹配成功,它將會從記憶體中被刪除,轉入磁碟等待匹配。
對於 ACK 訊息也一樣,它先被放入記憶體中匹配,如果在記憶體中找不到對應的 CheckPoint
,也會放入磁碟。
RocketMQ 的磁碟儲存實際上就是 Topic 和佇列。為了避免頻繁檢查匹配狀態,我們只在 CheckPoint
需要被喚醒時做檢查,這裡就可以用到定時訊息,將 CheckPoint
和 ACK 訊息定時到 ReviveTime
投遞。這裡 RocketMQ 將 CheckPoint
的投遞時間提前 1s,以便能先消費到,與 ACK 訊息匹配。
當定時到期,它們會被投遞到 REVIVE_TOPIC
。有個後臺執行緒消費這個 Topic,把 CheckPoint
放到一個 map 中,對於 ACK 訊息則從 map 中查詢 CheckPoint
來嘗試匹配,如果匹配成功則更新 REVIVE_TOPIC
的消費位點。對於超過 ReviveTime
還沒有被匹配的 CheckPoint
,查出這批訊息中要重試訊息對應的真實訊息,並放到 Pop 消費重試 Topic 中。
Broker 端的 Pop 消費邏輯會概率性消費到重試 Topic 中的訊息。
3. 詳細設計
3.1 Broker 端重平衡
Pop 消費的重平衡在 Broker 端完成,客戶端的重平衡服務重平衡時會向 Broker 端傳送查詢請求,查詢自己的分配結果。
重平衡的主要邏輯其實與在客戶端重平衡類似,只不過變成了 Broker 接收客戶端的引數之後根據這些引數進行重平衡,然後把重平衡結果返回給客戶端。
Broker 端重平衡入口為 QueryAssignmentProcessor#doLoadBalance()
。
對於廣播模式,直接返回 Topic 下所有的佇列。
對於叢集模式,Pop 模式的重平衡與 Push 模式不同,它允許一個佇列被多個消費者 Pop 消費。在切換 Pop 模式時引入了 popShareQueueNum
引數,表示允許消費者進行額外的負載獲取佇列的次數(可以被共享的佇列數),0 表示可以消費所有佇列。
所以重平衡時對每個消費者執行 popShareQueueNum
次重平衡策略,將多次重平衡分配到的佇列都分給這個消費者消費。這樣,每個佇列就會被多個消費者消費。
下圖為 popShareQueueNum = 1
時的重平衡情況,每個消費者被負載了 2 次,每個佇列被 2 個消費者共享(1 + popShareQueueNum
)。
3.2 Broker 端 Pop 訊息
3.2.1 請求處理入口
Pop 訊息的 Broker 端處理是由 PopMessageProcessor#processRequest()
完成。
該方法邏輯為
- 完成請求體解析和一些引數和許可權的校驗
- 生成一個 0 到 99 的隨機整數,如果能被 5 整除,則先拉取重試 Topic。
- 從重試 Topic 的每個 Queue 中 Pop 訊息
- 根據請求的佇列 Pop 對應的佇列的訊息。如果 Pop 請求指定了佇列,只會消費一個佇列的訊息;如果沒有指定佇列,則 Pop 所有佇列的訊息
- 如果 Pop 的訊息沒有滿(達到請求的最大訊息數量),且之前沒有拉取過重試訊息,則 Pop 重試 Topic 所有佇列的訊息(期望填充滿 Pop 請求要求的數量)
- 判斷是否 Pop 到訊息,如果有則傳輸回客戶端,如果沒有則掛起輪詢,直到超過請求的 timeout 引數指定的時間
3.2.2 Pop 訊息方法
上面的 3、4、5 都涉及到從儲存中 Pop 訊息,它們都呼叫同一個方法:popMsgFromQueue
,它是真正查詢訊息的方法,下面看一下它的邏輯
- 將需要 Pop 的佇列上鎖(用
AtomicBoolean
實現) - 計算 Pop 訊息的起始偏移量,會返回記憶體中 CheckPoint 與 ACK 訊息匹配後的最新位點
- 從磁碟中根據起始偏移量查詢一批訊息
- 計算佇列剩餘的訊息數量(用作返回值)
- 拉取的這批訊息將生成一個
CheckPoint
,存入記憶體和磁碟 - 解鎖佇列
- 返回 Pop 到的訊息
上面方法第 5 步會將生成的 CheckPoint
放入記憶體和磁碟,注意這個 CheckPoint
會儲存一批獲取到的訊息的起始偏移量和相對偏移量(相對於起始偏移量),所以一個 CheckPoint
在儲存和匹配時都對應一批訊息。
3.2.3 儲存 CheckPoint
用於匹配
- 構造
CheckPoint
,新增起始偏移量和所有 Pop 出的訊息的相對偏移量 - 嘗試將
CheckPoint
新增到記憶體 Buffer,如果成功則直接返回。但是在記憶體中匹配CheckPoint
和AckMsg
的開關預設是關閉的,所以這裡不會加入到記憶體,會繼續後面的邏輯放入磁碟 - 將
CheckPoint
構造成一個訊息,資料都放到訊息體中,然後這個訊息定時到ReviveTime
(喚醒重試的時間)- 1s(為了留時間與AckMsg
匹配)傳送。會發送到 ReviveTopic 的一個佇列。
3.3 Broker 端 ACK 訊息
Ack 訊息介面每次只允許 Ack 一條訊息,入口是 AckMessageProcessor#processRequest()
- 從請求頭解析和構造 Ack 訊息,並作一些校驗
- 順序訊息 Ack 和普通訊息 Ack 分別處理,這裡針對普通訊息
- 先嚐試將 Ack 訊息放入記憶體 Buffer,如果成功則直接返回。失敗則有可能是記憶體匹配未開啟。
- 如果放入記憶體失敗,構造一個用於存到磁碟的訊息,定時到喚醒重試時間投遞(到 ReviveTopic)。
3.4 Broker 端 CheckPoint
與 AckMsg
匹配
CheckPoint
和 AckMsg
都被設計成先嚐試放入記憶體中匹配,然後再磁碟中匹配,因為通常情況下訊息消費之後都能很快 ACK,記憶體匹配效能較高。如果 CheckPoint
在記憶體中停留太久沒有被匹配,則會轉移到磁碟中(ReviveTopic),有個執行緒消費這個 ReviveTopic 來匹配。到達喚醒重試時間(ReviveTime)還沒有被匹配的 CheckPoint
裡面的訊息將會重試(傳送到 Pop 訊息重試 Topic,後面的 Pop 有概率消費到)。
3.4.1 記憶體匹配
記憶體匹配邏輯由一個執行緒 PopBufferMergeService
完成,只有主節點執行該匹配執行緒。
Pop 訊息時會先新增 CheckPoint
到 buffer,Ack 訊息時嘗試從記憶體 buffer 中的 CheckPoint
匹配。同時,它每 5ms 執行一次掃描,將不符合記憶體中存活條件的 CheckPoint
移除,放入磁碟儲存。
addCk
方法將 CheckPoint
放入記憶體 Buffer。CheckPoint
中有一個碼錶 BitMap
,用來表示它裡面的每個條訊息是否被 Ack 和被存到磁碟。用 BitMap
可以加速匹配。
addAk
方法會嘗試從 buffer 中找 CheckPoint
來匹配。如果找到對應的 CheckPoint
,則修改它碼錶的對應位,表示這條訊息被 ACK。
scan
方法每 5ms 執行一次
- 將已經匹配或存檔的
CheckPoint
移出 buffer - 把超時的
CheckPoint
存入磁碟 - 對於匹配完成或者存檔的
CheckPoint
,為他們提交訊息偏移量
3.4.2 Store 匹配和訊息重試
從記憶體中移除儲存到磁碟的 CheckPoint
和 AckMsg
都會封裝成訊息進行定時投遞(定時到重試時間),最終投遞到 ReviveTopic
。儲存中匹配也由一個執行緒 PopReviveService
完成,它消費 ReviveTopic
的訊息進行匹配和重試。
Pop 消費由於要根據 Topic 來 Pop 訊息,重試 Topic 需要針對每個 [消費組-Topic] 隔離,所以它不能用普通訊息的消費組維度的重試 Topic,而是用專門的 Pop 重試 Topic %RETRY%{消費組}_{TOPIC}
。
PopReviveService#run
方法是該處理執行緒的入口,它每秒都會呼叫 consumeReviveMessage
消費和匹配 ReviveTopic 訊息,然後呼叫 mergeAndRevive
方法檢查匹配的情況並對達到喚醒時間還沒有成功匹配的訊息重試。
這兩個方法會先初始化一個 map,用於存放 CheckPoint
,供 AckMsg
根據 map key 查詢 CheckPoint
。
consumeReviveMessage
會消費 2s 內的一批 ReviveTopic 訊息,CK 訊息放入 map,Ack 訊息則從 map 中查詢 CK,在碼錶上標記對應的訊息為 Acked。
mergeAndRevive
方法如其名,遍歷消費到的 CK 訊息,對於已經到重試時間的,對沒有 Ack 的訊息進行重試。
重試邏輯為先從 MessageStore 查詢對應的真正訊息,然後將該訊息傳送到 Pop 重試佇列。
4. 原始碼解析
4.1 Broker 端重平衡
4.1.1 QueryAssignmentProcessor#doLoadBalance
/**
* Broker 端重平衡
* Returns empty set means the client should clear all load assigned to it before, null means invalid result and the
* client should skip the update logic
*
* @param topic
* @param consumerGroup
* @param clientId
* @param messageModel 消費模型(廣播/叢集)
* @param strategyName 重平衡策略名
* @return the MessageQueues assigned to this client
*/
private Set<MessageQueue> doLoadBalance(final String topic, final String consumerGroup, final String clientId,
final MessageModel messageModel, final String strategyName,
SetMessageRequestModeRequestBody setMessageRequestModeRequestBody, final ChannelHandlerContext ctx) {
Set<MessageQueue> assignedQueueSet = null;
final TopicRouteInfoManager topicRouteInfoManager = this.brokerController.getTopicRouteInfoManager();
switch (messageModel) {
case BROADCASTING: {
// 廣播模式,返回該 Topic 下所有佇列
assignedQueueSet = topicRouteInfoManager.getTopicSubscribeInfo(topic);
if (assignedQueueSet == null) {
log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// 叢集模式
// 獲取 Topic 下所有佇列
Set<MessageQueue> mqSet = topicRouteInfoManager.getTopicSubscribeInfo(topic);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", consumerGroup, topic);
}
return null;
}
if (!brokerController.getBrokerConfig().isServerLoadBalancerEnable()) {
return mqSet;
}
List<String> cidAll = null;
// 獲取發起請求的消費組資訊
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
if (consumerGroupInfo != null) {
cidAll = consumerGroupInfo.getAllClientId();
}
if (null == cidAll) {
log.warn("QueryLoad: no assignment for group[{}] topic[{}], get consumer id list failed", consumerGroup, topic);
return null;
}
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 將佇列和消費者客戶端ID 排序
Collections.sort(mqAll);
Collections.sort(cidAll);
List<MessageQueue> allocateResult = null;
try {
// 根據重平衡策略名稱獲取策略
AllocateMessageQueueStrategy allocateMessageQueueStrategy = name2LoadStrategy.get(strategyName);
if (null == allocateMessageQueueStrategy) {
log.warn("QueryLoad: unsupported strategy [{}], {}", strategyName, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
return null;
}
if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) {
// POP 模式重平衡
allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll,
cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum());
} else {
// 普通重平衡
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
}
} catch (Throwable e) {
log.error("QueryLoad: no assignment for group[{}] topic[{}], allocate message queue exception. strategy name: {}, ex: {}", consumerGroup, topic, strategyName, e);
return null;
}
assignedQueueSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
assignedQueueSet.addAll(allocateResult);
}
break;
}
default:
break;
}
return assignedQueueSet;
}
4.1.2 QueryAssignmentProcessor#allocate4Pop
/**
* POP 模式重平衡
*
* @param allocateMessageQueueStrategy 重平衡策略
* @param consumerGroup 消費組
* @param clientId 消費組客戶端 ID
* @param mqAll 全部訊息佇列
* @param cidAll 全部客戶端ID
* @param popShareQueueNum Pop 模式下可允許被共享的佇列數,0 表示無限
* @return 該消費者負載的佇列列表
*/
public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll,
int popShareQueueNum) {
List<MessageQueue> allocateResult;
if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
// 每個消費者能消費所有佇列,返回全部佇列。佇列 ID 為 -1 表示 Pop 消費時消費全部佇列
//each client pop all messagequeue
allocateResult = new ArrayList<>(mqAll.size());
for (MessageQueue mq : mqAll) {
//must create new MessageQueue in case of change cache in AssignmentManager
MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
allocateResult.add(newMq);
}
} else {
if (cidAll.size() <= mqAll.size()) {
// 消費者數量小於等於佇列數量,每個消費者分配 N 個佇列,每個佇列也會被分配給多個消費者
//consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
int index = cidAll.indexOf(clientId);
if (index >= 0) {
// 負載 popShareQueueNum 次,將每次負載的結果加入最終結果
for (int i = 1; i <= popShareQueueNum; i++) {
index++;
index = index % cidAll.size();
List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
allocateResult.addAll(tmp);
}
}
} else {
// 消費者數量大於佇列數量,保證每個消費者都有佇列消費
//make sure each cid is assigned
allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
}
}
return allocateResult;
}
4.2 Broker 端 Pop 訊息
4.2.1 PopMessageProcessor#processRequest
/**
* 處理 POP 訊息請求
*
* @param channel
* @param request
* @return
* @throws RemotingCommandException
*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request)
throws RemotingCommandException {
// ... 解析請求體和一系列校驗
// 生成隨機數
int randomQ = random.nextInt(100);
int reviveQid;
if (requestHeader.isOrder()) {
reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;
} else {
// 輪詢選一個 Revive 佇列
reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());
}
int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();
GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);
// 佇列中剩餘的訊息數量
long restNum = 0;
// 1/5 的概率拉取重試訊息
boolean needRetry = randomQ % 5 == 0;
long popTime = System.currentTimeMillis();
// 拉取重試訊息
if (needRetry && !requestHeader.isOrder()) {
TopicConfig retryTopicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,
channel, popTime, messageFilter,
startOffsetInfo, msgOffsetInfo, orderCountInfo);
}
}
}
// 如果拉取請求沒有指定佇列(-1),則拉取所有佇列
if (requestHeader.getQueueId() < 0) {
// read all queue
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,
startOffsetInfo, msgOffsetInfo, orderCountInfo);
}
} else {
// 拉取請求指定了佇列,拉取對應的佇列
int queueId = requestHeader.getQueueId();
restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel,
popTime, messageFilter,
startOffsetInfo, msgOffsetInfo, orderCountInfo);
}
// 如果前面拉取普通訊息之後,沒有滿,則再拉取一次重試訊息
// if not full , fetch retry again
if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) {
TopicConfig retryTopicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,
channel, popTime, messageFilter,
startOffsetInfo, msgOffsetInfo, orderCountInfo);
}
}
}
// 拉取訊息成功
if (!getMessageResult.getMessageBufferList().isEmpty()) {
response.setCode(ResponseCode.SUCCESS);
getMessageResult.setStatus(GetMessageStatus.FOUND);
if (restNum > 0) {
// all queue pop can not notify specified queue pop, and vice versa
notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
requestHeader.getQueueId());
}
} else {
// 沒有拉取到訊息,長輪詢
int pollingResult = polling(channel, request, requestHeader);
if (POLLING_SUC == pollingResult) {
return null;
} else if (POLLING_FULL == pollingResult) {
response.setCode(ResponseCode.POLLING_FULL);
} else {
response.setCode(ResponseCode.POLLING_TIMEOUT);
}
getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
}
responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
responseHeader.setPopTime(popTime);
responseHeader.setReviveQid(reviveQid);
responseHeader.setRestNum(restNum);
responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
if (requestHeader.isOrder() && orderCountInfo != null) {
responseHeader.setOrderCountInfo(orderCountInfo.toString());
}
response.setRemark(getMessageResult.getStatus().name());
// 傳輸訊息
return response;
}
4.2.2 PopMessageProcessor#popMsgFromQueue
/**
* 從訊息佇列中 POP 訊息
*
* @param isRetry 是否是重試 Topic
* @param getMessageResult
* @param requestHeader
* @param queueId 訊息佇列 ID
* @param restNum 佇列剩餘訊息數量
* @param reviveQid 喚醒佇列 ID
* @param channel Netty Channel,用於獲取客戶端 host,來提交消費進度
* @param popTime Pop 時間
* @param messageFilter
* @param startOffsetInfo 獲取 Pop 的起始偏移量
* @param msgOffsetInfo 獲取所有 Pop 的訊息的邏輯偏移量
* @param orderCountInfo
* @return 佇列剩餘訊息
*/
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
Channel channel, long popTime,
ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()) : requestHeader.getTopic();
// {TOPIC}@{GROUP}@{QUEUE_ID}
String lockKey =
topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
boolean isOrder = requestHeader.isOrder();
long offset = getPopOffset(topic, requestHeader, queueId, false, lockKey);
// Queue 上加鎖,保證同一時刻只有一個消費者可以拉取同一個 Queue 的訊息
if (!queueLockManager.tryLock(lockKey)) {
// 返回該佇列中待 Pop 的訊息數量
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
return restNum;
}
// 計算要 POP 的訊息偏移量
offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
GetMessageResult getMessageTmpResult = null;
try {
// 順序消費,阻塞
if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
}
// 已經拉取到足夠的訊息
if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {
restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
return restNum;
}
// 從磁碟訊息儲存中根據邏輯偏移量查詢訊息
getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup()
, topic, queueId, offset,
requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
if (getMessageTmpResult == null) {
return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
}
// maybe store offset is not correct.
if (GetMessageStatus.OFFSET_TOO_SMALL.equals(getMessageTmpResult.getStatus())
|| GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageTmpResult.getStatus())
|| GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageTmpResult.getStatus())) {
// commit offset, because the offset is not correct
// If offset in store is greater than cq offset, it will cause duplicate messages,
// because offset in PopBuffer is not committed.
POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}",
lockKey, offset, getMessageTmpResult.getNextBeginOffset());
offset = getMessageTmpResult.getNextBeginOffset();
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
queueId, offset);
getMessageTmpResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic,
queueId, offset,
requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
}
// 計算佇列還剩下的訊息數量
restNum = getMessageTmpResult.getMaxOffset() - getMessageTmpResult.getNextBeginOffset() + restNum;
if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {
// 更新統計資料
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageTmpResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), topic,
getMessageTmpResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic,
getMessageTmpResult.getBufferTotalSize());
if (isOrder) {
// 順序消費,更新偏移量
int count = brokerController.getConsumerOrderInfoManager().update(topic,
requestHeader.getConsumerGroup(),
queueId, getMessageTmpResult.getMessageQueueOffset());
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic, queueId, offset);
ExtraInfoUtil.buildOrderCountInfo(orderCountInfo, isRetry, queueId, count);
} else {
// 新增 CheckPoint 到記憶體,用於等待 ACK
appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());
}
ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, isRetry, queueId, offset);
ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, queueId,
getMessageTmpResult.getMessageQueueOffset());
} else if ((GetMessageStatus.NO_MATCHED_MESSAGE.equals(getMessageTmpResult.getStatus())
|| GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageTmpResult.getStatus())
|| GetMessageStatus.MESSAGE_WAS_REMOVING.equals(getMessageTmpResult.getStatus())
|| GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(getMessageTmpResult.getStatus()))
&& getMessageTmpResult.getNextBeginOffset() > -1) {
// 沒有拉取到訊息,新增假的訊息 CheckPoint 到佇列
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, queueId, offset,
requestHeader.getInvisibleTime(), popTime, reviveQid, getMessageTmpResult.getNextBeginOffset(), brokerController.getBrokerConfig().getBrokerName());
// this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
// queueId, getMessageTmpResult.getNextBeginOffset());
}
} catch (Exception e) {
POP_LOGGER.error("Exception in popMsgFromQueue", e);
} finally {
// Pop 完後解鎖
queueLockManager.unLock(lockKey);
}
// 將拉取到的訊息放入結果容器中
if (getMessageTmpResult != null) {
for (SelectMappedBufferResult mapedBuffer : getMessageTmpResult.getMessageMapedList()) {
getMessageResult.addMessage(mapedBuffer);
}
}
return restNum;
}
4.2.3 PopMessageProcessor#appendCheckPoint
/**
* 在 POP 拉取訊息後呼叫,新增 CheckPoint,等待 ACK
*
* @param requestHeader
* @param topic POP 的 Topic
* @param reviveQid Revive 佇列 ID
* @param queueId POP 的佇列 ID
* @param offset POP 訊息的起始偏移量
* @param getMessageTmpResult POP 一批訊息的結果
* @param popTime POP 時間
* @param brokerName
*/
private void appendCheckPoint(final PopMessageRequestHeader requestHeader,
final String topic, final int reviveQid, final int queueId, final long offset,
final GetMessageResult getMessageTmpResult, final long popTime, final String brokerName) {
// add check point msg to revive log
final PopCheckPoint ck = new PopCheckPoint();
// ... 構造 PopCheckPoint,賦值過程省略
for (Long msgQueueOffset : getMessageTmpResult.getMessageQueueOffset()) {
// 新增所有拉取的訊息的偏移量與起始偏移量的差值
ck.addDiff((int) (msgQueueOffset - offset));
}
// 將 Offset 放入記憶體
final boolean addBufferSuc = this.popBufferMergeService.addCk(
ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
);
if (addBufferSuc) {
return;
}
// 放入記憶體匹配失敗(記憶體匹配未開啟),將 Offset 放入記憶體和磁碟
this.popBufferMergeService.addCkJustOffset(
ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
);
}
4.3 Broker 端 Ack 訊息
4.3.1 AckMessageProcessor#processRequest
/**
* 處理 Ack 訊息請求,每次 Ack 一條訊息
*
* @param channel
* @param request
* @param brokerAllowSuspend
* @return
* @throws RemotingCommandException
*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
// 解析請求頭
final AckMessageRequestHeader requestHeader = (AckMessageRequestHeader) request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
AckMsg ackMsg = new AckMsg();
RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
response.setOpaque(request.getOpaque());
// ... 校驗
// 拆分訊息控制代碼字串
String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
// 用請求頭中的資訊構造 AckMsg
ackMsg.setAckOffset(requestHeader.getOffset());
ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));
ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());
ackMsg.setTopic(requestHeader.getTopic());
ackMsg.setQueueId(requestHeader.getQueueId());
ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));
ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
int rqId = ExtraInfoUtil.getReviveQid(extraInfo);
this.brokerController.getBrokerStatsManager().incBrokerAckNums(1);
this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1);
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
// ... 順序訊息 ACK
}
// 普通訊息 ACK
// 先嚐試放入記憶體匹配,成功則直接返回。失敗可能是記憶體匹配未開啟
if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
return response;
}
// 構造 Ack 訊息
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
//msgInner.setQueueId(Integer.valueOf(extraInfo[3]));
msgInner.setQueueId(rqId);
msgInner.setTags(PopAckConstants.ACK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
msgInner.setBornHost(this.brokerController.getStoreHost());
msgInner.setStoreHost(this.brokerController.getStoreHost());
// 定時訊息,定時到喚醒重試時間投遞
msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// 儲存 Ack 訊息到磁碟
PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("put ack msg error:" + putMessageResult);
}
return response;
}
4.4 Broker 端 CheckPoint
與 AckMsg
匹配
4.4.1 PopBufferMergeService#addCk
/**
* POP 訊息後,新增 CheckPoint,放入記憶體 Buffer
*
* @param point
* @param reviveQueueId
* @param reviveQueueOffset
* @param nextBeginOffset
* @return 是否新增成功
*/
public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
// key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt()
if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
return false;
}
// 記憶體匹配服務是否開啟
if (!serving) {
return false;
}
// 距離下次可重試 Pop 消費的時刻 < 4.5s
long now = System.currentTimeMillis();
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now);
}
return false;
}
if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {
POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get());
return false;
}
PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset);
if (!checkQueueOk(pointWrapper)) {
return false;
}
// 將 CheckPoint 放入 Offset 佇列
putOffsetQueue(pointWrapper);
// 將 CheckPoint 放入記憶體 Buffer
this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
this.counter.incrementAndGet();
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper);
}
return true;
}
4.4.2 PopBufferMergeService#addAk
/**
* 訊息 ACK,與記憶體中的 CheckPoint 匹配
*
* @param reviveQid
* @param ackMsg
* @return 是否匹配成功
*/
public boolean addAk(int reviveQid, AckMsg ackMsg) {
// 如果未開啟記憶體匹配,直接返回
if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
return false;
}
if (!serving) {
return false;
}
try {
// 根據 ACK 的訊息找到記憶體 Buffer 中的 CheckPoint
PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());
if (pointWrapper == null) {
// 找不到 CheckPoint
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg);
}
return false;
}
// 記憶體中僅儲存 Offset,實際已經儲存到磁碟,記憶體中不處理 ACK 訊息的匹配,直接返回
if (pointWrapper.isJustOffset()) {
return false;
}
PopCheckPoint point = pointWrapper.getCk();
long now = System.currentTimeMillis();
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);
}
return false;
}
if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);
}
return false;
}
// 標記該 CheckPoint 已經被 ACK
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
if (indexOfAck > -1) {
// 設定 CheckPoint 中被 Ack 訊息的 bit 碼錶為 1
markBitCAS(pointWrapper.getBits(), indexOfAck);
} else {
POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
return true;
}
return true;
} catch (Throwable e) {
POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e);
}
return false;
}
4.4.3 PopBufferMergeService#scan
/**
* 掃描記憶體中的 CheckPoint
* 把已經匹配或存檔的 CheckPoint 移出 buffer
* 把已經全部 Ack 的 CheckPoint 存檔
*/
private void scan() {
long startTime = System.currentTimeMillis();
int count = 0, countCk = 0;
Iterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();
// 遍歷所有記憶體中的 CheckPoint
while (iterator.hasNext()) {
Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
PopCheckPointWrapper pointWrapper = entry.getValue();
// 如果 CheckPoint 已經在磁碟中,或者全部訊息都匹配成功,從記憶體中 buffer 中移除
// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
iterator.remove();
counter.decrementAndGet();
continue;
}
PopCheckPoint point = pointWrapper.getCk();
long now = System.currentTimeMillis();
// 是否要從記憶體中移除 CheckPoint
boolean removeCk = !this.serving;
// 距離 ReviveTime 時間小於閾值(預設3s)
// ck will be timeout
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
removeCk = true;
}
// 在記憶體中時間大於閾值(預設10s)
// the time stayed is too long
if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
removeCk = true;
}
if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {
POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);
}
// double check
if (isCkDone(pointWrapper)) {
continue;
} else if (pointWrapper.isJustOffset()) {
// just offset should be in store.
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, false);
countCk++;
}
continue;
} else if (removeCk) {
// 將 CheckPoint 包裝成訊息放入磁碟,從記憶體中移除
// put buffer ak to store
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, false);
countCk++;
}
if (!pointWrapper.isCkStored()) {
continue;
}
// 在記憶體中移除 CheckPoint 前,把它當中已經 Ack 的訊息也作為 Ack 訊息存入磁碟
for (byte i = 0; i < point.getNum(); i++) {
// 遍歷 CheckPoint 中訊息 bit 碼錶每一位,檢查是否已經 Ack 並且沒有存入磁碟
// reput buffer ak to store
if (DataConverter.getBit(pointWrapper.getBits().get(), i)
&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
if (putAckToStore(pointWrapper, i)) {
count++;
markBitCAS(pointWrapper.getToStoreBits(), i);
}
}
}
if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);
}
iterator.remove();
counter.decrementAndGet();
continue;
}
}
}
// 掃描已經完成的 CheckPoint,為它們提交訊息消費進度
int offsetBufferSize = scanCommitOffset();
scanTimes++;
if (scanTimes >= countOfMinute1) {
counter.set(this.buffer.size());
scanTimes = 0;
}
}
4.4.4 PopReviveService#consumeReviveMessage
/**
* 消費 Revive Topic 中的訊息,匹配 ACK 訊息和 CheckPoint
* CK 訊息放到 Map 中,ACK 訊息根據 Map key 匹配 CK 訊息,更新 CK 訊息的碼錶以完成 ACK
* 只對 CK 進行標記
* 消費時間差 2s 內的 CK、ACK 訊息,或 4s 沒有消費到新訊息
*
* @param consumeReviveObj CK 與 ACK 匹配物件,用於 Revive 需要重試 Pop 消費的訊息
*/
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
// CheckPoint 匹配 map,key = point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime()
HashMap<String, PopCheckPoint> map = consumeReviveObj.map;
long startScanTime = System.currentTimeMillis();
long endTime = 0;
// 查詢 ReviveTopic queue 之前的消費進度
long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);
consumeReviveObj.oldOffset = oldOffset;
POP_LOGGER.info("reviveQueueId={}, old offset is {} ", queueId, oldOffset);
long offset = oldOffset + 1;
// 沒有查詢到訊息的次數
int noMsgCount = 0;
long firstRt = 0;
// offset self amend
while (true) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
// 查詢一批 Revive Topic 中的訊息(32條)
List<MessageExt> messageExts = getReviveMessage(offset, queueId);
if (messageExts == null || messageExts.isEmpty()) {
long old = endTime;
long timerDelay = brokerController.getMessageStore().getTimerMessageStore().getReadBehind();
long commitLogDelay = brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehind();
// move endTime
if (endTime != 0 && System.currentTimeMillis() - endTime > 3 * PopAckConstants.SECOND && timerDelay <= 0 && commitLogDelay <= 0) {
endTime = System.currentTimeMillis();
}
POP_LOGGER.info("reviveQueueId={}, offset is {}, can not get new msg, old endTime {}, new endTime {}",
queueId, offset, old, endTime);
// 最後一個 CK 的喚醒時間與第一個 CK 的喚醒時間差大於 2s,中斷消費
if (endTime - firstRt > PopAckConstants.ackTimeInterval + PopAckConstants.SECOND) {
break;
}
noMsgCount++;
// Fixme: why sleep is useful here?
try {
Thread.sleep(100);
} catch (Throwable ignore) {
}
// 連續 4s 沒有消費到新的訊息,中斷消費
if (noMsgCount * 100L > 4 * PopAckConstants.SECOND) {
break;
} else {
continue;
}
} else {
noMsgCount = 0;
}
if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
break;
}
// 遍歷查詢到的訊息
for (MessageExt messageExt : messageExts) {
if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
// 如果是 CheckPoint
String raw = new String(messageExt.getBody(), DataConverter.charset);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ck, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);
if (point.getTopic() == null || point.getCId() == null) {
continue;
}
// 放入 HashMap,等待 ACK 訊息匹配
map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
// 設定 reviveOffset 為 revive 佇列中訊息的邏輯 offset
point.setReviveOffset(messageExt.getQueueOffset());
if (firstRt == 0) {
firstRt = point.getReviveTime();
}
} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
// 如果是 ACK 訊息
String raw = new String(messageExt.getBody(), DataConverter.charset);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());
if (point == null) {
continue;
}
// 如果 HashMap 中有 CheckPoint,計算 ACK 的 bit 碼錶
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
if (indexOfAck > -1) {
// Ack 訊息 bit 碼錶為 1 的位 Ack 成功
point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
} else {
POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);
}
}
long deliverTime = messageExt.getDeliverTimeMs();
if (deliverTime > endTime) {
endTime = deliverTime;
}
}
offset = offset + messageExts.size();
}
consumeReviveObj.endTime = endTime;
}
4.4.5 PopReviveService#mergeAndRevive
/**
* 匹配消費到的一批 CK 和 ACK 訊息,對於沒有成功 ACK 的訊息,重發到重試 Topic
*/
protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
// 獲取排序後的 CheckPoint 列表
ArrayList<PopCheckPoint> sortList = consumeReviveObj.genSortList();
// ...
long newOffset = consumeReviveObj.oldOffset;
for (PopCheckPoint popCheckPoint : sortList) {
// ...
// 如果沒有到 Revive 時間,跳過
if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) {
break;
}
// 從 CK 中解析原 Topic 並檢查該 Topic 是否存在,如果不存在則跳過
// check normal topic, skip ck , if normal topic is not exist
String normalTopic = KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId());
if (brokerController.getTopicConfigManager().selectTopicConfig(normalTopic) == null) {
POP_LOGGER.warn("reviveQueueId={},can not get normal topic {} , then continue ", queueId, popCheckPoint.getTopic());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
if (null == brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popCheckPoint.getCId())) {
POP_LOGGER.warn("reviveQueueId={},can not get cid {} , then continue ", queueId, popCheckPoint.getCId());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
// 重發 CK 中沒有 Ack 的所有訊息
reviveMsgFromCk(popCheckPoint);
newOffset = popCheckPoint.getReviveOffset();
}
// 匹配和重試完成後,更新 ReviveTopic 消費進度
if (newOffset > consumeReviveObj.oldOffset) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip commit, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
return;
}
this.brokerController.getConsumerOffsetManager().commitOffset(PopAckConstants.LOCAL_HOST, PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, newOffset);
}
consumeReviveObj.newOffset = newOffset;
}
4.4.6 PopReviveService
: 重試訊息
/**
* 重發 CK 中沒有 Ack 的所有訊息
*/
private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {
// 遍歷 CK 中的所有訊息
for (int j = 0; j < popCheckPoint.getNum(); j++) {
if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {
continue;
}
// retry msg
long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
// 查詢 CK 訊息對應的真正訊息
MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());
if (messageExt == null) {
POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",
queueId, popCheckPoint.getTopic(), msgOffset);
continue;
}
//skip ck from last epoch
if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);
continue;
}
// 喚醒沒有被 ACK 的訊息,發到重試佇列
reviveRetry(popCheckPoint, messageExt);
}
}
/**
* 根據 CheckPoint 喚醒沒有被 ACK 的訊息,發到重試佇列
*
* @param popCheckPoint CK
* @param messageExt 要被重試的訊息
* @throws Exception
*/
private void reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) throws Exception {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
return;
}
// 構造新的訊息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
// 喚醒的訊息發到重試 Topic
if (!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()));
} else {
msgInner.setTopic(popCheckPoint.getTopic());
}
msgInner.setBody(messageExt.getBody());
msgInner.setQueueId(0);
if (messageExt.getTags() != null) {
msgInner.setTags(messageExt.getTags());
} else {
MessageAccessor.setProperties(msgInner, new HashMap<String, String>());
}
msgInner.setBornTimestamp(messageExt.getBornTimestamp());
msgInner.setBornHost(brokerController.getStoreHost());
msgInner.setStoreHost(brokerController.getStoreHost());
// 重試次數 += 1
msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
msgInner.getProperties().putAll(messageExt.getProperties());
if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));
}
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// 新增 Pop 重試 Topic
addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
// 儲存重試訊息到儲存
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
(System.currentTimeMillis() - popCheckPoint.getReviveTime()) / 1000, putMessageResult);
}
if (putMessageResult.getAppendMessageResult() == null || putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
throw new Exception("reviveQueueId=" + queueId + ",revive error ,msg is :" + msgInner);
}
// ... 更新統計資料
if (brokerController.getPopMessageProcessor() != null) {
brokerController.getPopMessageProcessor().notifyMessageArriving(
KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
popCheckPoint.getCId(),
-1
);
}
}
參考資料
歡迎關注公眾號【訊息中介軟體】(middleware-mq),更新訊息中介軟體的原始碼解析和最新動態!
本文由部落格一文多發平臺 OpenWrite 釋出!