zk session expire會引起HA模式的rm一直處於standby嗎
【概述】
最近連續在多個環境中遇到了同一個問題:在HA模式下,兩個resourcemanager均為standby,並且持續沒有選舉出新的leader。經過一番分析,並對照原始碼梳理問題出現前後的邏輯流程,最後發現是因為zk會話過期(session expire)引起的問題,本文就覆盤總結下。
【RM的正常選舉流程】
在很早之前的文章中,介紹過hadoop裡namenode的HA機制(戳這裡),RM的選舉流程其實是複用了同樣的框架,只是以一個獨立執行緒的方式執行,而不是像namenode一樣,有個獨立的程序(zkfc)負責與zk連線並選舉。
因此,整體的選舉流程會和namenode的選舉方式基本雷同,即首先向zk建立連線,當連線建立成功後,在zk上競爭建立臨時鎖節點,成功建立的rm成為active,失敗的則成為standby。
【與zk之間網路異常後的情況】
正常邏輯是相對簡單的,那我們再來看看與zk之間網路出現異常,以及網路異常恢復之後的處理邏輯,具體如下圖所示:
1. 當ZK服務出現故障,或者網路出現故障,導致網路完全不可達時,客戶端與ZK的連線會出現在指定時間內沒有讀到任何資料,從而引發會話超時。(也可能是讀異常,此時產生的是EndOfStreamException,後續處理邏輯與會話超時的邏輯一樣)。
這個時候,zk客戶端的傳送執行緒會拋會話超時的異常,同時內部捕獲該異常, 向事件回撥執行緒的佇列中插入連線斷開的事件。此後,迴圈執行與zk的重連動作。
while (state.isAlive()) {
try {
...
if (to <= 0) {
String warnInfo;
warnInfo =
"Client session timed out, have not heard from server in " +
clientCnxnSocket.getIdleRecv() + "ms" +
" for sessionid 0x" + Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
} catch (Throwable e) {
...
if (state.isAlive()) {
eventThread.queueEvent(
new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
...
}
}
2. zk客戶端中的事件回撥執行緒接收到事件後,向上進行回撥通知。在RM的回撥處理中,啟動定時器執行緒,觸發成為standby。
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
...
if (eventType == Event.EventType.None) {
switch (event.getState()) {
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
...
}
}
}
private void enterNeutralMode() {
if (state != State.NEUTRAL) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entering neutral mode for " + this);
}
state = State.NEUTRAL;
appClient.enterNeutralMode();
}
}
public void enterNeutralMode() {
LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
+ zkSessionTimeout + " ms if connection is not reestablished.");
// If we've just become disconnected, start a timer. When the time's up,
// we'll transition to standby.
synchronized (zkDisconnectLock) {
if (zkDisconnectTimer == null) {
zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
zkDisconnectTimer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (zkDisconnectLock) {
// Only run if the timer hasn't been cancelled
if (zkDisconnectTimer != null) {
becomeStandby();
}
}
}
}, zkSessionTimeout);
}
}
}
3. 當網路恢復後,ZK客戶端重連成功, 但仍舊是攜帶老的會話ID傳送註冊請求,如果重連時間超過了會話過期的時間,那麼服務端會給出相應應答,告知會話過期,同時斷開連線。
此時,ZK客戶端內部發送執行緒會從響應中得到知道會話過期,向事件執行緒傳送會話過期事件以及執行緒退出事件,同時將自身狀態置為CLOSED,並丟擲異常,這樣傳送執行緒也就會退出迴圈從而結束執行。
void onConnected(
int _negotiatedSessionTimeout,
long _sessionId,
byte[] _sessionPasswd,
boolean isRO)
throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if(negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo;
warnInfo =
"Unable to reconnect to ZooKeeper service, session 0x" +
Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
}
4. 在會話過期的回撥處理中,修改自身狀態,並重新參與選舉,這包括關閉當前的客戶端,重新建立新的zk客戶端進行連線,如果能成功連線,則繼續建立鎖節點來進行leader的選舉。
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
...
if (eventType == Event.EventType.None) {
switch (event.getState()) {
case Expired:
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
...
}
}
}
private void reJoinElection(int sleepTime) {
LOG.info("Trying to re-establish ZK session");
sessionReestablishLockForTests.lock();
try {
terminateConnection();
sleepFor(sleepTime);
// Should not join election even before the SERVICE is reported
// as HEALTHY from ZKFC monitoring.
if (appData != null) {
joinElectionInternal();
} else {
LOG.info("Not joining election since service has not yet been " +
"reported as healthy.");
}
} finally {
sessionReestablishLockForTests.unlock();
}
}
private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
}
createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync();
}
對於standby的RM,其完整的日誌如下所示:
// 超時會接收到任何資料
2022-09-01 19:10:25,230 WARN org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000
// 異常捕獲
2022-09-01 19:10:25,230 INFO org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000, closing socket connection and attempting reconnect
// RM的回撥處理
2022-09-01 19:10:25,331 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
// 觸發定時器執行緒
2022-09-01 19:10:25,331 WARN org.apache.hadoop.yarn.server.resourcemanager.ActiveStandbyElectorBasedElectorService: Lost contact with Zookeeper. Transitioning to standby in 10000 ms if connection is not reestablished.
// ZK客戶端的傳送執行緒嘗試重連
2022-09-01 19:10:26,905 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server zk-0-hncscwc.network-hncscwc/172.168.1.1:2181. Will not attempt to authenticate using SASL (unknown error)
// 定時器執行緒觸發進行狀態的狀態, 但當前狀態已經是standby狀態
2022-09-01 19:10:35,334 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Already in standby state
// 重連成功
2022-09-01 19:13:51,101 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, initiating session
// 會話過期, 向事件回撥執行緒佇列插入會話過期的事件
2022-09-01 19:13:51,104 WARN org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired
// 回撥處理, 並觸發重新選舉
2022-09-01 19:13:51,104 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session expired. Entering neutral mode and rejoining...
// 傳送執行緒捕獲異常
2022-09-01 19:13:51,105 INFO org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired, closing socket connection
// 重新建立連線並進行選舉
2022-09-01 19:13:51,105 INFO org.apache.hadoop.ha.ActiveStandbyElector: Trying to re-establish ZK session
2022-09-01 19:13:51,109 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, initiating session
// 成功建立連線(注意會話ID不同)
2022-09-01 19:13:51,122 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, sessionid = 0x10054aa9d110006, negotiated timeout = 10000
// 連線成功建立的回撥
2022-09-01 19:13:51,123 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.
【極端情況引起的BUG】
上面的邏輯分析中提到了,首次重連ZK後觸發會話過期後,會重新建立新的客戶端進行重連動作(畢竟老的會話已經過期,無法再繼續使用)。通常情況下, 網路都是相對穩定的,建立新的客戶端連線肯定可以重連成功,畢竟這一系列動作是連貫的,中間沒有任何睡眠操作。
但如果真的有極端情況,會話過期後重連ZK失敗,並且達到重連的最大次數後仍舊未成功連線ZK。那麼此時,會再向上層回撥一個致命錯誤,對於這型別錯誤的處理,則是建立一個執行緒先進行standby狀態的轉換,然後再進行重新選舉的動作。
在這個執行緒中,會對一個原子變數進行判斷(初始值為false)。如果為false,表示當前沒有執行緒在執行這個動作,將該變數置為true,然後進行後續動作。
然而,這個地方,也是BUG所在的地方。在這個執行緒中重新進行選舉,其邏輯和之前一樣,依舊是先嚐試連線ZK,如果持續無法連線到ZK,並且達到最大重連次數,則再觸發回撥,建立新執行緒進行後續邏輯。但此時,原子變數的值已經被置為true,新的執行緒執行後,判斷該值為true,則直接退出。此後就沒有機會再進行與ZK的重連動作了。
對應的程式碼如下所示:
private void fatalError(String errorMessage) {
LOG.error(errorMessage);
reset();
appClient.notifyFatalError(errorMessage);
}
public void notifyFatalError(String errorMessage) {
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage));
}
private class RMFatalEventDispatcher implements EventHandler<RMFatalEvent> {
@Override
public void handle(RMFatalEvent event) {
LOG.error("Received " + event);
if (HAUtil.isHAEnabled(getConfig())) {
LOG.warn("Transitioning the resource manager to standby.");
handleTransitionToStandByInNewThread();
}
...
}
}
private void handleTransitionToStandByInNewThread() {
Thread standByTransitionThread =
new Thread(activeServices.standByTransitionRunnable);
standByTransitionThread.setName("StandByTransitionThread");
standByTransitionThread.start();
}
private class StandByTransitionRunnable implements Runnable {
// The atomic variable to make sure multiple threads with the same runnable
// run only once.
private final AtomicBoolean hasAlreadyRun = new AtomicBoolean(false);
@Override
public void run() {
// Run this only once, even if multiple threads end up triggering
// this simultaneously.
if (hasAlreadyRun.getAndSet(true)) {
return;
}
if (rmContext.isHAEnabled()) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
EmbeddedElector elector = rmContext.getLeaderElectorService();
if (elector != null) {
elector.rejoinElection();
}
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.", e);
ExitUtil.terminate(1, e);
}
}
}
}
線上程中進行狀態轉換的過程中,有個細節需要注意:
如果進行轉換時,RM的當前狀態為active,那麼此時會停止activeService並重新初始化,即重新建立一個新的例項物件出來。而前面的原子變數,也會隨著新的例項物件重新被賦值為false。
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby state");
HAServiceState state = rmContext.getHAServiceState();
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices();
reinitialize(initialize);
}
LOG.info("Transitioned to standby state");
}
void reinitialize(boolean initialize) {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
getResourceScheduler().resetSchedulerMetrics();
if (initialize) {
resetRMContext();
createAndInitActiveServices(true);
}
}
protected void createAndInitActiveServices(boolean fromActive) {
activeServices = new RMActiveServices(this);
activeServices.fromActive = fromActive;
activeServices.init(conf);
}
同時,此時會走重新初始化建立連線的邏輯流程,因此,這裡是可以正確進行重連。但此後,active的狀態切換為standby,在未成為active之前,如果繼續出現會話過期後的重連ZK失敗,那麼仍舊會出現無法再重連zk的問題。
【可以穩定復現的方式】
清楚問題產生的場景後,也就能比較容易的進行問題復現了,我們可以通過iptables丟棄從zk過來的資料包進行模擬。例如在與ZK的連線斷開一段時間後,再執行下面的指令碼命令,這樣,問題現象大概率就復現出來了。
#!/bin/bash
# 恢復網路
iptables -F
# 短暫睡眠,使其可以重連成功
sleep 0.3
# 再次模擬與ZK的網路異常
iptables -A INPUT -p tcp --sport 2181 -j DROP
【問題解決】
問題的解決其實也很簡單,比如去除原子布林變數的判斷邏輯,同時在後續的執行動作中加鎖保護,避免多執行緒併發操作;另一種更簡單的方式是啟用curator框架,新版本中大多引入了該框架,只是預設為false,即沒有使用,可以配置使用該框架,也能對這個問題進行規避。
好了,這就是本文的全部內容,如果覺得本文對您有幫助,請點贊+轉發,也歡迎加我微信交流~
本文分享自微信公眾號 - hncscwc(gh_383bc7486c1a)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。
- zk session expire會引起HA模式的rm一直處於standby嗎
- yarn節點屬性及排程
- 基於ranger的kafka許可權控制
- kafka的訪問控制
- 一文講透hdfs的delegation token
- kafka客戶端訊息傳送邏輯
- 原始碼閱讀之我見
- 容量排程絕對值配置佇列使用與避坑
- 2.X版本的一個通病問題
- 被這個引數三殺了
- 一文搞懂hadoop中的使用者
- 正確理解Yarn容量排程中的capacity引數
- 從hudi持久化檔案理解其核心概念
- 一文搞懂hadoop的metrics
- hdfs——nn的啟動優化
- HDFS用了這個優化後,效能直接翻倍
- 說說hdfs是如何處理塊副本多餘和缺失的
- 5000字12張圖講解nn記憶體中的元資料資訊
- 一文搞懂Hadoop Archive
- YARN——標籤排程