zk session expire會引起HA模式的rm一直處於standby嗎

語言: CN / TW / HK

【概述】


最近連續在多個環境中遇到了同一個問題:在HA模式下,兩個resourcemanager均為standby,並且持續沒有選舉出新的leader。經過一番分析,並對照原始碼梳理問題出現前後的邏輯流程,最後發現是因為zk會話過期(session expire)引起的問題,本文就覆盤總結下。

【RM的正常選舉流程】

在很早之前的文章中,介紹過hadoop裡namenode的HA機制(戳這裡),RM的選舉流程其實是複用了同樣的框架,只是以一個獨立執行緒的方式執行,而不是像namenode一樣,有個獨立的程序(zkfc)負責與zk連線並選舉。

因此,整體的選舉流程會和namenode的選舉方式基本雷同,即首先向zk建立連線,當連線建立成功後,在zk上競爭建立臨時鎖節點,成功建立的rm成為active,失敗的則成為standby。

【與zk之間網路異常後的情況】

正常邏輯是相對簡單的,那我們再來看看與zk之間網路出現異常,以及網路異常恢復之後的處理邏輯,具體如下圖所示:

1. 當ZK服務出現故障,或者網路出現故障,導致網路完全不可達時,客戶端與ZK的連線會出現在指定時間內沒有讀到任何資料,從而引發會話超時。(也可能是讀異常,此時產生的是EndOfStreamException,後續處理邏輯與會話超時的邏輯一樣)。

這個時候,zk客戶端的傳送執行緒會拋會話超時的異常,同時內部捕獲該異常, 向事件回撥執行緒的佇列中插入連線斷開的事件。此後,迴圈執行與zk的重連動作。

while (state.isAlive()) {
    try {
        ...
        if (to <= 0) {
            String warnInfo;
            warnInfo = 
                "Client session timed out, have not heard from server in " +
                clientCnxnSocket.getIdleRecv() + "ms" + 
                " for sessionid 0x" + Long.toHexString(sessionId);
            LOG.warn(warnInfo);
            throw new SessionTimeoutException(warnInfo);
        }
    } catch (Throwable e) {
        ...
        if (state.isAlive()) {
            eventThread.queueEvent(
                new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
        }
        ...
    }
}

2. zk客戶端中的事件回撥執行緒接收到事件後,向上進行回撥通知。在RM的回撥處理中,啟動定時器執行緒,觸發成為standby。

synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    ...
    if (eventType == Event.EventType.None) {
        switch (event.getState()) {
        case Disconnected:
            LOG.info("Session disconnected. Entering neutral mode...");

            zkConnectionState = ConnectionState.DISCONNECTED;
            enterNeutralMode();
            break;
        ...
        }
    }
}

private void enterNeutralMode() {
    if (state != State.NEUTRAL) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Entering neutral mode for " + this);
        }
        state = State.NEUTRAL;
        appClient.enterNeutralMode();
    }
}

public void enterNeutralMode() {
    LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
        + zkSessionTimeout + " ms if connection is not reestablished.");

    // If we've just become disconnected, start a timer. When the time's up,
    // we'll transition to standby.
    synchronized (zkDisconnectLock) {
      if (zkDisconnectTimer == null) {
        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
        zkDisconnectTimer.schedule(new TimerTask() {
          @Override
          public void run() {
            synchronized (zkDisconnectLock) {
              // Only run if the timer hasn't been cancelled
              if (zkDisconnectTimer != null) {
                becomeStandby();
              }
            }
          }
        }, zkSessionTimeout);
      }
    }
}

3. 當網路恢復後,ZK客戶端重連成功, 但仍舊是攜帶老的會話ID傳送註冊請求,如果重連時間超過了會話過期的時間,那麼服務端會給出相應應答,告知會話過期,同時斷開連線。

此時,ZK客戶端內部發送執行緒會從響應中得到知道會話過期,向事件執行緒傳送會話過期事件以及執行緒退出事件,同時將自身狀態置為CLOSED,並丟擲異常,這樣傳送執行緒也就會退出迴圈從而結束執行

void onConnected(
    int _negotiatedSessionTimeout,
    long _sessionId,
    byte[] _sessionPasswd,
    boolean isRO)
    throws IOException {
    negotiatedSessionTimeout = _negotiatedSessionTimeout;
    if(negotiatedSessionTimeout <= 0) {
        state = States.CLOSED;
        eventThread.queueEvent(new WatchedEvent(
            Watcher.Event.EventType.None,
            Watcher.Event.KeeperState.Expired, null));
        eventThread.queueEventOfDeath();

        String warnInfo;
        warnInfo = 
            "Unable to reconnect to ZooKeeper service, session 0x" +
            Long.toHexString(sessionId) + " has expired";
        LOG.warn(warnInfo);
        throw new SessionExpiredException(warnInfo);
    }
}

4. 在會話過期的回撥處理中,修改自身狀態,並重新參與選舉,這包括關閉當前的客戶端,重新建立新的zk客戶端進行連線,如果能成功連線,則繼續建立鎖節點來進行leader的選舉。

synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    ...
    if (eventType == Event.EventType.None) {
        switch (event.getState()) {
        case Expired:
            LOG.info("Session expired. Entering neutral mode and rejoining...");
            enterNeutralMode();
            reJoinElection(0);
            break;
        ...
        }
    }
}

private void reJoinElection(int sleepTime) {
    LOG.info("Trying to re-establish ZK session");
    
    sessionReestablishLockForTests.lock();
    try {
        terminateConnection();
        sleepFor(sleepTime);
        // Should not join election even before the SERVICE is reported
        // as HEALTHY from ZKFC monitoring.
        if (appData != null) {
            joinElectionInternal();
        } else {
            LOG.info("Not joining election since service has not yet been " +
                "reported as healthy.");
        }
    } finally {
        sessionReestablishLockForTests.unlock();
    }
}

private void joinElectionInternal() {
    Preconditions.checkState(appData != null,
        "trying to join election without any app data");
    if (zkClient == null) {
        if (!reEstablishSession()) {
            fatalError("Failed to reEstablish connection with ZooKeeper");
            return;
        }
    }

    createRetryCount = 0;
    wantToBeInElection = true;
    createLockNodeAsync();
}

對於standby的RM,其完整的日誌如下所示:

// 超時會接收到任何資料
2022-09-01 19:10:25,230 WARN org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000
// 異常捕獲
2022-09-01 19:10:25,230 INFO org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000, closing socket connection and attempting reconnect
// RM的回撥處理
2022-09-01 19:10:25,331 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
// 觸發定時器執行緒
2022-09-01 19:10:25,331 WARN org.apache.hadoop.yarn.server.resourcemanager.ActiveStandbyElectorBasedElectorService: Lost contact with Zookeeper. Transitioning to standby in 10000 ms if connection is not reestablished.
// ZK客戶端的傳送執行緒嘗試重連
2022-09-01 19:10:26,905 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server zk-0-hncscwc.network-hncscwc/172.168.1.1:2181. Will not attempt to authenticate using SASL (unknown error)
// 定時器執行緒觸發進行狀態的狀態, 但當前狀態已經是standby狀態
2022-09-01 19:10:35,334 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Already in standby state
// 重連成功
2022-09-01 19:13:51,101 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, initiating session
// 會話過期, 向事件回撥執行緒佇列插入會話過期的事件
2022-09-01 19:13:51,104 WARN org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired
// 回撥處理, 並觸發重新選舉
2022-09-01 19:13:51,104 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session expired. Entering neutral mode and rejoining...
// 傳送執行緒捕獲異常
2022-09-01 19:13:51,105 INFO org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired, closing socket connection
// 重新建立連線並進行選舉
2022-09-01 19:13:51,105 INFO org.apache.hadoop.ha.ActiveStandbyElector: Trying to re-establish ZK session
2022-09-01 19:13:51,109 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, initiating session
// 成功建立連線(注意會話ID不同)
2022-09-01 19:13:51,122 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, sessionid = 0x10054aa9d110006, negotiated timeout = 10000
// 連線成功建立的回撥
2022-09-01 19:13:51,123 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.

【極端情況引起的BUG】

上面的邏輯分析中提到了,首次重連ZK後觸發會話過期後,會重新建立新的客戶端進行重連動作(畢竟老的會話已經過期,無法再繼續使用)。通常情況下, 網路都是相對穩定的,建立新的客戶端連線肯定可以重連成功,畢竟這一系列動作是連貫的,中間沒有任何睡眠操作。

但如果真的有極端情況,會話過期後重連ZK失敗,並且達到重連的最大次數後仍舊未成功連線ZK。那麼此時,會再向上層回撥一個致命錯誤,對於這型別錯誤的處理,則是建立一個執行緒先進行standby狀態的轉換,然後再進行重新選舉的動作

在這個執行緒中,會對一個原子變數進行判斷(初始值為false)。如果為false,表示當前沒有執行緒在執行這個動作,將該變數置為true,然後進行後續動作。

然而,這個地方,也是BUG所在的地方在這個執行緒中重新進行選舉,其邏輯和之前一樣,依舊是先嚐試連線ZK,如果持續無法連線到ZK,並且達到最大重連次數,則再觸發回撥,建立新執行緒進行後續邏輯但此時,原子變數的值已經被置為true,新的執行緒執行後,判斷該值為true,則直接退出。此後就沒有機會再進行與ZK的重連動作了。

對應的程式碼如下所示:

private void fatalError(String errorMessage) {
    LOG.error(errorMessage);
    reset();
    appClient.notifyFatalError(errorMessage);
}

public void notifyFatalError(String errorMessage) {
    rm.getRMContext().getDispatcher().getEventHandler().handle(
        new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
            errorMessage));
}

private class RMFatalEventDispatcher implements EventHandler<RMFatalEvent> {
    @Override
    public void handle(RMFatalEvent event) {
        LOG.error("Received " + event);

        if (HAUtil.isHAEnabled(getConfig())) {
            LOG.warn("Transitioning the resource manager to standby.");
            handleTransitionToStandByInNewThread();
        }
        ...
    }
}

private void handleTransitionToStandByInNewThread() {
    Thread standByTransitionThread =
        new Thread(activeServices.standByTransitionRunnable);
    standByTransitionThread.setName("StandByTransitionThread");
    standByTransitionThread.start();
}

private class StandByTransitionRunnable implements Runnable {
    // The atomic variable to make sure multiple threads with the same runnable
    // run only once.
    private final AtomicBoolean hasAlreadyRun = new AtomicBoolean(false);

    @Override
    public void run() {
      // Run this only once, even if multiple threads end up triggering
      // this simultaneously.
      if (hasAlreadyRun.getAndSet(true)) {
        return;
      }

      if (rmContext.isHAEnabled()) {
        try {
          // Transition to standby and reinit active services
          LOG.info("Transitioning RM to Standby mode");
          transitionToStandby(true);
          EmbeddedElector elector = rmContext.getLeaderElectorService();
          if (elector != null) {
            elector.rejoinElection();
          }
        } catch (Exception e) {
          LOG.fatal("Failed to transition RM to Standby mode.", e);
          ExitUtil.terminate(1, e);
        }
      }
    }
}

線上程中進行狀態轉換的過程中,有個細節需要注意:

如果進行轉換時,RM的當前狀態為active,那麼此時會停止activeService並重新初始化,即重新建立一個新的例項物件出來。而前面的原子變數,也會隨著新的例項物件重新被賦值為false。

synchronized void transitionToStandby(boolean initialize)
    throws Exception {
    if (rmContext.getHAServiceState() ==
        HAServiceProtocol.HAServiceState.STANDBY) {
        LOG.info("Already in standby state");
        return;
    }

    LOG.info("Transitioning to standby state");
    HAServiceState state = rmContext.getHAServiceState();
    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
    if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
        stopActiveServices();
        reinitialize(initialize);
    }
    LOG.info("Transitioned to standby state");
}

void reinitialize(boolean initialize) {
    ClusterMetrics.destroy();
    QueueMetrics.clearQueueMetrics();
    getResourceScheduler().resetSchedulerMetrics();
    if (initialize) {
        resetRMContext();
        createAndInitActiveServices(true);
    }
}

protected void createAndInitActiveServices(boolean fromActive) {
    activeServices = new RMActiveServices(this);
    activeServices.fromActive = fromActive;
    activeServices.init(conf);
}

同時,此時會走重新初始化建立連線的邏輯流程,因此,這裡是可以正確進行重連。但此後,active的狀態切換為standby,在未成為active之前,如果繼續出現會話過期後的重連ZK失敗,那麼仍舊會出現無法再重連zk的問題。

【可以穩定復現的方式】

清楚問題產生的場景後,也就能比較容易的進行問題復現了,我們可以通過iptables丟棄從zk過來的資料包進行模擬。例如在與ZK的連線斷開一段時間後,再執行下面的指令碼命令,這樣,問題現象大概率就復現出來了。

#!/bin/bash
# 恢復網路
iptables -F
# 短暫睡眠,使其可以重連成功
sleep 0.3
# 再次模擬與ZK的網路異常
iptables -A INPUT -p tcp --sport 2181 -j DROP

【問題解決】

問題的解決其實也很簡單,比如去除原子布林變數的判斷邏輯,同時在後續的執行動作中加鎖保護,避免多執行緒併發操作;另一種更簡單的方式是啟用curator框架,新版本中大多引入了該框架,只是預設為false,即沒有使用,可以配置使用該框架,也能對這個問題進行規避。

 

好了,這就是本文的全部內容,如果覺得本文對您有幫助,請點贊+轉發,也歡迎加我微信交流~

 

本文分享自微信公眾號 - hncscwc(gh_383bc7486c1a)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。