zk session expire会引起HA模式的rm一直处于standby吗

最近连续在多个环境中遇到了同一个问题:在HA模式下,两个resourcemanager均为standby,并且持续没有选举出新的leader。经过一番分析,并对照源码梳理问题出现前后的逻辑流程,最后发现是因为zk会话过期(session expire)引起的问题,本文就复盘总结下。






1. 当ZK服务出现故障,或者网络出现故障,导致网络完全不可达时,客户端与ZK的连接会出现在指定时间内没有读到任何数据,从而引发会话超时。(也可能是读异常,此时产生的是EndOfStreamException,后续处理逻辑与会话超时的逻辑一样)。

这个时候,zk客户端的发送线程会抛会话超时的异常,同时内部捕获该异常, 向事件回调线程的队列中插入连接断开的事件。此后,循环执行与zk的重连动作。

while (state.isAlive()) {
    try {
        if (to <= 0) {
            String warnInfo;
            warnInfo = 
                "Client session timed out, have not heard from server in " +
                clientCnxnSocket.getIdleRecv() + "ms" + 
                " for sessionid 0x" + Long.toHexString(sessionId);
            throw new SessionTimeoutException(warnInfo);
    } catch (Throwable e) {
        if (state.isAlive()) {
                new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));

2. zk客户端中的事件回调线程接收到事件后,向上进行回调通知。在RM的回调处理中,启动定时器线程,触发成为standby。

synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    if (eventType == Event.EventType.None) {
        switch (event.getState()) {
        case Disconnected:
            LOG.info("Session disconnected. Entering neutral mode...");

            zkConnectionState = ConnectionState.DISCONNECTED;

private void enterNeutralMode() {
    if (state != State.NEUTRAL) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Entering neutral mode for " + this);
        state = State.NEUTRAL;

public void enterNeutralMode() {
    LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
        + zkSessionTimeout + " ms if connection is not reestablished.");

    // If we've just become disconnected, start a timer. When the time's up,
    // we'll transition to standby.
    synchronized (zkDisconnectLock) {
      if (zkDisconnectTimer == null) {
        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
        zkDisconnectTimer.schedule(new TimerTask() {
          public void run() {
            synchronized (zkDisconnectLock) {
              // Only run if the timer hasn't been cancelled
              if (zkDisconnectTimer != null) {
        }, zkSessionTimeout);

3. 当网络恢复后,ZK客户端重连成功, 但仍旧是携带老的会话ID发送注册请求,如果重连时间超过了会话过期的时间,那么服务端会给出相应应答,告知会话过期,同时断开连接。


void onConnected(
    int _negotiatedSessionTimeout,
    long _sessionId,
    byte[] _sessionPasswd,
    boolean isRO)
    throws IOException {
    negotiatedSessionTimeout = _negotiatedSessionTimeout;
    if(negotiatedSessionTimeout <= 0) {
        state = States.CLOSED;
        eventThread.queueEvent(new WatchedEvent(
            Watcher.Event.KeeperState.Expired, null));

        String warnInfo;
        warnInfo = 
            "Unable to reconnect to ZooKeeper service, session 0x" +
            Long.toHexString(sessionId) + " has expired";
        throw new SessionExpiredException(warnInfo);

4. 在会话过期的回调处理中,修改自身状态,并重新参与选举,这包括关闭当前的客户端,重新创建新的zk客户端进行连接,如果能成功连接,则继续创建锁节点来进行leader的选举。

synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    if (eventType == Event.EventType.None) {
        switch (event.getState()) {
        case Expired:
            LOG.info("Session expired. Entering neutral mode and rejoining...");

private void reJoinElection(int sleepTime) {
    LOG.info("Trying to re-establish ZK session");
    try {
        // Should not join election even before the SERVICE is reported
        // as HEALTHY from ZKFC monitoring.
        if (appData != null) {
        } else {
            LOG.info("Not joining election since service has not yet been " +
                "reported as healthy.");
    } finally {

private void joinElectionInternal() {
    Preconditions.checkState(appData != null,
        "trying to join election without any app data");
    if (zkClient == null) {
        if (!reEstablishSession()) {
            fatalError("Failed to reEstablish connection with ZooKeeper");

    createRetryCount = 0;
    wantToBeInElection = true;


// 超时会接收到任何数据
2022-09-01 19:10:25,230 WARN org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000
// 异常捕获
2022-09-01 19:10:25,230 INFO org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000, closing socket connection and attempting reconnect
// RM的回调处理
2022-09-01 19:10:25,331 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
// 触发定时器线程
2022-09-01 19:10:25,331 WARN org.apache.hadoop.yarn.server.resourcemanager.ActiveStandbyElectorBasedElectorService: Lost contact with Zookeeper. Transitioning to standby in 10000 ms if connection is not reestablished.
// ZK客户端的发送线程尝试重连
2022-09-01 19:10:26,905 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server zk-0-hncscwc.network-hncscwc/ Will not attempt to authenticate using SASL (unknown error)
// 定时器线程触发进行状态的状态, 但当前状态已经是standby状态
2022-09-01 19:10:35,334 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Already in standby state
// 重连成功
2022-09-01 19:13:51,101 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/, initiating session
// 会话过期, 向事件回调线程队列插入会话过期的事件
2022-09-01 19:13:51,104 WARN org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired
// 回调处理, 并触发重新选举
2022-09-01 19:13:51,104 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session expired. Entering neutral mode and rejoining...
// 发送线程捕获异常
2022-09-01 19:13:51,105 INFO org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired, closing socket connection
// 重新建立连接并进行选举
2022-09-01 19:13:51,105 INFO org.apache.hadoop.ha.ActiveStandbyElector: Trying to re-establish ZK session
2022-09-01 19:13:51,109 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/, initiating session
// 成功建立连接(注意会话ID不同)
2022-09-01 19:13:51,122 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server zk-0-hncscwc.network-hncscwc/, sessionid = 0x10054aa9d110006, negotiated timeout = 10000
// 连接成功建立的回调
2022-09-01 19:13:51,123 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.


上面的逻辑分析中提到了,首次重连ZK后触发会话过期后,会重新创建新的客户端进行重连动作(毕竟老的会话已经过期,无法再继续使用)。通常情况下, 网络都是相对稳定的,创建新的客户端连接肯定可以重连成功,毕竟这一系列动作是连贯的,中间没有任何睡眠操作。





private void fatalError(String errorMessage) {

public void notifyFatalError(String errorMessage) {
        new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,

private class RMFatalEventDispatcher implements EventHandler<RMFatalEvent> {
    public void handle(RMFatalEvent event) {
        LOG.error("Received " + event);

        if (HAUtil.isHAEnabled(getConfig())) {
            LOG.warn("Transitioning the resource manager to standby.");

private void handleTransitionToStandByInNewThread() {
    Thread standByTransitionThread =
        new Thread(activeServices.standByTransitionRunnable);

private class StandByTransitionRunnable implements Runnable {
    // The atomic variable to make sure multiple threads with the same runnable
    // run only once.
    private final AtomicBoolean hasAlreadyRun = new AtomicBoolean(false);

    public void run() {
      // Run this only once, even if multiple threads end up triggering
      // this simultaneously.
      if (hasAlreadyRun.getAndSet(true)) {

      if (rmContext.isHAEnabled()) {
        try {
          // Transition to standby and reinit active services
          LOG.info("Transitioning RM to Standby mode");
          EmbeddedElector elector = rmContext.getLeaderElectorService();
          if (elector != null) {
        } catch (Exception e) {
          LOG.fatal("Failed to transition RM to Standby mode.", e);
          ExitUtil.terminate(1, e);



synchronized void transitionToStandby(boolean initialize)
    throws Exception {
    if (rmContext.getHAServiceState() ==
        HAServiceProtocol.HAServiceState.STANDBY) {
        LOG.info("Already in standby state");

    LOG.info("Transitioning to standby state");
    HAServiceState state = rmContext.getHAServiceState();
    if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
    LOG.info("Transitioned to standby state");

void reinitialize(boolean initialize) {
    if (initialize) {

protected void createAndInitActiveServices(boolean fromActive) {
    activeServices = new RMActiveServices(this);
    activeServices.fromActive = fromActive;




# 恢复网络
iptables -F
# 短暂睡眠,使其可以重连成功
sleep 0.3
# 再次模拟与ZK的网络异常
iptables -A INPUT -p tcp --sport 2181 -j DROP






