一文講透hdfs的delegation token

語言: CN / TW / HK

【背景】


前一段時間總結了hadoop中的token認證、yarn任務執行中的token,其中也都提到了delegation token。而最近也遇到了一個問題,問題現象是:flink任務執行超過七天後,由於宿主機異常導致任務失敗,繼而觸發任務的重試,但接連重試幾次都是失敗的,並且任務的日誌也沒有聚合,導致無法分析問題失敗的原因。最後發現是和delegation token有關,本文就來總結下相關的原理。

【原理】


1. 什麼是delegation token

先簡單描述下為什麼需要delegation token。在開啟kerberos之後,服務之間互動前,都需要先向KDC認證獲取對應的票據。而在一個yarn任務執行過程中可能會產生很多工container,每個這樣的任務container都可能會訪問hdfs,由於訪問前需要先獲取票據來進行認證,那麼這個時候KDC就很容易成為效能瓶頸。delegation token(委派token)就是為了減少不必要的認證工作而出現的。

2. delegation token在任務提交執行過程中的使用

任務提交執行過程中,delegation token相關的流程如下圖所示:

1)首先,RM啟動後,內部會建立一個服務執行緒專門用於處理token的更新

// ResourceManager.java
protected void serviceInit(Configuration configuration) throws Exception {
    ...
    if (UserGroupInformation.isSecurityEnabled()) {
        delegationTokenRenewer = createDelegationTokenRenewer();
        rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
    }
    ....
}

protected DelegationTokenRenewer createDelegationTokenRenewer() {
    return new DelegationTokenRenewer();
}

2)客戶端申請delegation token

客戶端在提交任務前,通常需要先向hdfs上傳資原始檔(包括執行所需的jar包等),在此過程中會向nn申請一個delegation token,並放到任務啟動上下文中,然後向rm傳送提交任務請求(請求中包含任務的啟動上下文)。

下面是flink on yarn提交任務時的程式碼片段:

// flink YarnClusterDescriptor.java
private ApplicationReport startAppMaster(...){
    // 開啟kerberos的情況下,獲取token
    if (UserGroupInformation.isSecurityEnabled()) {
      // set HDFS delegation tokens when security is enabled
      LOG.info("Adding delegation token to the AM container.");
      List<Path> yarnAccessList =
        ConfigUtils.decodeListFromConfig(
          configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
      Utils.setTokensFor(
        amContainer,
        ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
        yarnConfiguration);
    }
}

public static void setTokensFor(
    ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
    throws IOException {
    Credentials credentials = new Credentials();
    // for HDFS
    TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
    // for HBase
    obtainTokenForHBase(credentials, conf);
    // for user
    UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
    // 獲取到的token 放到啟動上下文中
    Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
    for (Token<? extends TokenIdentifier> token : usrTok) {
        final Text id = new Text(token.getIdentifier());
        LOG.info("Adding user token " + id + " with " + token);
        credentials.addToken(id, token);
    }
    try (DataOutputBuffer dob = new DataOutputBuffer()) {
        credentials.writeTokenStorageToStream(dob);

        if (LOG.isDebugEnabled()) {
            LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
        }

        ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        amContainer.setTokens(securityTokens);
    }
}

// TokenCache.java
// 呼叫hadoop的介面 向nn請求token
public static void obtainTokensForNamenodes(
    Credentials credentials,
    Path[] ps, Configuration conf) 
    throws IOException {
    if (!UserGroupInformation.isSecurityEnabled()) {
        return;
    }
    obtainTokensForNamenodesInternal(credentials, ps, conf);
}

static void obtainTokensForNamenodesInternal(
    Credentials credentials,
    Path[] ps, 
    Configuration conf) 
    throws IOException {
    Set<FileSystem> fsSet = new HashSet<FileSystem>();
    for (Path p : ps) {
        fsSet.add(p.getFileSystem(conf));
    }
    String masterPrincipal = Master.getMasterPrincipal(conf);
    for (FileSystem fs : fsSet) {
        obtainTokensForNamenodesInternal(fs, credentials, conf, masterPrincipal);
    }
}

static void obtainTokensForNamenodesInternal(
    FileSystem fs,
    Credentials credentials, 
    Configuration conf, 
    String renewer)
    throws IOException {
    ...
    final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, credentials);
    ...
}

// FileSystem.java
public Token<?>[] addDelegationTokens(
    final String renewer, Credentials credentials) 
    throws IOException {
    if (credentials == null) {
        credentials = new Credentials();
    }
    final List<Token<?>> tokens = new ArrayList<>();
    collectDelegationTokens(renewer, credentials, tokens);
    return tokens.toArray(new Token<?>[tokens.size()]);
}

private void collectDelegationTokens(
    final String renewer,
    final Credentials credentials,
    final List<Token<?>> tokens)
    throws IOException {
    final String serviceName = getCanonicalServiceName();
    // Collect token of the this filesystem and then of its embedded children
    if (serviceName != null) { // fs has token, grab it
        final Text service = new Text(serviceName);
        Token<?> token = credentials.getToken(service);
        if (token == null) {
            // 向NN 請求delegation token
            token = getDelegationToken(renewer);
            if (token != null) {
                tokens.add(token);
                credentials.addToken(service, token);
            }
        }
    }
    ...
}

3)RM將token新增到delegation token更新服務中

RM在處理客戶端提交任務請求時,判斷是否啟用kerberos認證,如果啟用則從任務啟動上下文中解析出delegation token,並新增到delegation token更新服務中。在該服務中,會啟動執行緒定時對delegation token進行更新。此後,繼續向NM傳送啟動container的請求,delegation token則隨啟動上下文被帶到NM中。

// RMAppManager.java
protected void submitApplication(
    ApplicationSubmissionContext submissionContext, 
    long submitTime,
    String user)
    throws YarnException {
    ...
    if (UserGroupInformation.isSecurityEnabled()) {
        this.rmContext.getDelegationTokenRenewer().addApplicationAsync(
            applicationId,
            BuilderUtils.parseCredentials(submissionContext),
            submissionContext.getCancelTokensWhenComplete(),
            application.getUser(),
            BuilderUtils.parseTokensConf(submissionContext));
    }
    ...
}

4)NM使用delegation token

NM收到啟動container的請求後,從請求(任務啟動上下文)中解析出delegation token,併為該container構造一個對應的例項物件,同時將delegation token儲存在該例項物件中,然後為該container進行資源本地化,即從hdfs中下載必須的資原始檔,這裡就會用到傳遞過來的delegation token。同時在任務結束時,如果需要進行任務日誌聚合,仍舊會使用該delegation token將任務的日誌上傳到hdfs的指定路徑。

另外,delegation token還會寫入到持久化檔案中,一方面用於NM的異常恢復,另一方面是將token傳遞給任務container程序以供使用。

 

3. delegation token的更新與生命週期

1)申請token時已經指定了token的最大生命週期

// FSNamesystem.java
Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException {
    ...
    DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, renewer, realUser);
    token = new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
    ...
    return token;
}

// Token.java
public Token(T id, SecretManager<T> mgr) {
    password = mgr.createPassword(id);
    identifier = id.getBytes();
    kind = id.getKind();
    service = new Text();
}

// AbstractDelegationTokenSecretManager
protected synchronized byte[] createPassword(TokenIdent identifier) {
    long now = Time.now();
    identifier.setMaxDate(now + tokenMaxLifetime);
    ...
}

2)RM接收到任務提交請求後,先進行一次更新得到token的下次超時時間,然後再根據超時時間設定定時器時間觸發進行更新。

public void addApplicationSync(
    ApplicationId applicationId, 
    Credentials ts,
    boolean shouldCancelAtEnd, 
    String user) 
    throws IOException, InterruptedException {
    handleAppSubmitEvent(
        new DelegationTokenRenewerAppSubmitEvent(
            applicationId, ts, shouldCancelAtEnd, user, new Configuration()));
}

private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
    throws IOException, InterruptedException {
    ...
    Credentials ts = evt.getCredentials();
    Collection<Token<?>> tokens = ts.getAllTokens();
    for (Token<?> token : tokens) {
        DelegationTokenToRenew dttr = allTokens.get(token);
        if (dttr == null) {
            dttr = new DelegationTokenToRenew(
                Arrays.asList(applicationId), 
                token, tokenConf, now, shouldCancelAtEnd, 
                evt.getUser());
            try {
                // 先進行一次更新
                renewToken(dttr)
            } catch (IOException ioe) {
                ...
            }
        }
        tokenList.add(dttr);
    }
    
    if (!tokenList.isEmpty()) {
        for (DelegationTokenToRenew dtr : tokenList) {
            DelegationTokenToRenew currentDtr = allTokens.putIfAbsent(dtr.token, dtr);
            if (currentDtr != null) {
                // another job beat us
                currentDtr.referringAppIds.add(applicationId);
                appTokens.get(applicationId).add(currentDtr);
            } else {
                appTokens.get(applicationId).add(dtr);
                setTimerForTokenRenewal(dtr);
            }
        }
    }
}

protected void renewToken(final DelegationTokenToRenew dttr)
    throws IOException {
    // need to use doAs so that http can find the kerberos tgt
    // NOTE: token renewers should be responsible for the correct UGI!
    try {
        // 更新delegation token 並得到下次超時時間
        dttr.expirationDate =
            UserGroupInformation.getLoginUser().doAs(
                new PrivilegedExceptionAction<Long>() {
                    @Override
                    public Long run() throws Exception {
                        return dttr.token.renew(dttr.conf);
                    }
                });
    } catch (InterruptedException e) {
        throw new IOException(e);
    }
    LOG.info("Renewed delegation-token= [" + dttr + "]");
}

protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
    throws IOException {
    // calculate timer time
    long expiresIn = token.expirationDate - System.currentTimeMillis();
    if (expiresIn <= 0) {
        LOG.info("Will not renew token " + token);
        return;
    }
    long renewIn = token.expirationDate - expiresIn / 10; // little bit before the expiration
    // need to create new task every time
    RenewalTimerTask tTask = new RenewalTimerTask(token);
    token.setTimerTask(tTask); // keep reference to the timer

    renewalTimer.schedule(token.timerTask, new Date(renewIn));
    LOG.info(
        "Renew " + token + " in " + expiresIn + " ms, appId = " +
        token.referringAppIds);
}

再來看更新token的請求與處理細節:

// 客戶端傳送更新請求
public long renew(Token<?> token, Configuration conf) throws IOException {
    Token<DelegationTokenIdentifier> delToken = (Token<DelegationTokenIdentifier>) token;
    ClientProtocol nn = getNNProxy(delToken, conf);
    try {
        return nn.renewDelegationToken(delToken);
    } catch (RemoteException re) {
        throw re.unwrapRemoteException(InvalidToken.class,
                AccessControlException.class);
    }
}

// 服務端的響應處理
long renewDelegationToken(Token<DelegationTokenIdentifier> token)
    throws InvalidToken, IOException {
    try {
        ...
        expiryTime = dtSecretManager.renewToken(token, renewer);
    } catch (AccessControlException ace) {
        ...
    }
    return expiryTime;
}

public synchronized long renewToken(
    Token<TokenIdent> token,
    String renewer) 
    throws InvalidToken, IOException {
    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
    DataInputStream in = new DataInputStream(buf);
    TokenIdent id = createIdentifier();
    id.readFields(in);
    LOG.info(
        "Token renewal for identifier: " + formatTokenId(id) +
        "; total currentTokens " + currentTokens.size());

    long now = Time.now();
    if (id.getMaxDate() < now) {
        throw new InvalidToken(
            renewer + " tried to renew an expired token " +
            formatTokenId(id) + " max expiration date: " +
            Time.formatTime(id.getMaxDate()) +
            " currentTime: " + Time.formatTime(now));
    }
    if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) {
        throw new AccessControlException(
            renewer +
            " tried to renew a token " + formatTokenId(id) +
            " without a renewer");
    }
    if (!id.getRenewer().toString().equals(renewer)) {
        throw new AccessControlException(
            renewer +
            " tries to renew a token " + formatTokenId(id) +
            " with non-matching renewer " + id.getRenewer());
    }
    DelegationKey key = getDelegationKey(id.getMasterKeyId());
    if (key == null) {
        throw new InvalidToken(
            "Unable to find master key for keyId=" +
            id.getMasterKeyId() +
            " from cache. Failed to renew an unexpired token " +
            formatTokenId(id) + " with sequenceNumber=" +
            id.getSequenceNumber());
    }
    byte[] password = createPassword(token.getIdentifier(), key.getKey());
    if (!MessageDigest.isEqual(password, token.getPassword())) {
        throw new AccessControlException(
            renewer +
            " is trying to renew a token " +
            formatTokenId(id) + " with wrong password");
    }
    long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
    String trackingId = getTrackingIdIfEnabled(id);
    DelegationTokenInformation info = 
        new DelegationTokenInformation(renewTime, password, trackingId);

    if (getTokenInfo(id) == null) {
        throw new InvalidToken(
            "Renewal request for unknown token " + formatTokenId(id));
    }
    updateToken(id, info);
    return renewTime;
}

3)token達到最大生命週期的處理

在定時器中,會捕獲更新丟擲的異常,並直接移除失效的token。

但是注意:在每次更新之前,會按需重新申請新的delegation token(後面再展開講解)

public void run() {
    if (cancelled.get()) {
        return;
    }

    Token<?> token = dttr.token;

    try {
        // 先判斷是否需要申請新的token
        requestNewHdfsDelegationTokenIfNeeded(dttr);
        // if the token is not replaced by a new token, renew the token
        if (!dttr.isTimerCancelled()) {
            renewToken(dttr);
            setTimerForTokenRenewal(dttr);// set the next one
        } else {
            LOG.info("The token was removed already. Token = [" + dttr + "]");
        }
    } catch (Exception e) {
        LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
        removeFailedDelegationToken(dttr);
    }
}

【問題分析】


來看看前面問題失敗的相關日誌,覆盤分析下。

首先從NM的日誌中發現任務在重試時,因為無法下載資源(到本地)導致無法啟動任務,而下載資源失敗的原因則是因為無效的token。

2022-07-18 13:44:18,665 WARN org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache
2022-07-18 13:44:18,669 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService: { hdfs://hdfsHACluster/user/hncscwc/.flink/application_1637733238080_3800/application_1637733238080_38002636034628721129021.tmp, 1656925873322, FILE, null } failed: token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache
  at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1486)
  at org.apache.hadoop.ipc.Client.call(Client.java:1432)
  at org.apache.hadoop.ipc.Client.call(Client.java:1342)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
  at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
  at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:796)
  at sun.reflect.GeneratedMethodAccessor172.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:411)
  at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
  at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
  at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
  at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:348)
  at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
  at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1649)
  at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1440)
  at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1437)
  at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1452)
  at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
  at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
  at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
  at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)
  at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)
  at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

為什麼會出現無效的token,接著再看RM的日誌。

2022-07-04 17:11:13,400 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler: Application 'application_1637733238080_3800' is submitted without priority hence considering default queue/cluster priority: 0
2022-07-04 17:11:13,424 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657012273422; apps=[application_1637733238080_3800]]
2022-07-05 14:47:13,462 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657090033446; apps=[application_1637733238080_3800]]
2022-07-06 12:23:13,467 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657167793465; apps=[application_1637733238080_3800]]
2022-07-07 09:59:13,487 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657245553484; apps=[application_1637733238080_3800]]
2022-07-08 07:35:13,532 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657323313511; apps=[application_1637733238080_3800]]
2022-07-09 05:11:13,551 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657401073532; apps=[application_1637733238080_3800]]
2022-07-10 02:47:13,564 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657478833547; apps=[application_1637733238080_3800]]
2022-07-11 00:23:13,591 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:07,361 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:07,361 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 6032 ms, appId = [application_1637733238080_3800]
2022-07-11 17:11:12,793 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:12,793 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 600 ms, appId = [application_1637733238080_3800]
2022-07-11 17:11:13,337 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:13,337 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 56 ms, appId = [application_1637733238080_3800]
2022-07-11 17:11:13,391 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:13,391 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 2 ms, appId = [application_1637733238080_3800]
2022-07-11 17:11:13,398 ERROR org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Exception renewing tokenKind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc). Not rescheduled
org.apache.hadoop.security.token.SecretManager$InvalidToken: hadoop tried to renew an expired token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) max expiration date: 2022-07-11 17:11:13,393+0800 currentTime: 2022-07-11 17:11:13,394+0800
  at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:499)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:5952)
  at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:675)
  at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1035)
  at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
  at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
  at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)
  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)

  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
  at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
  at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:761)
  at org.apache.hadoop.security.token.Token.renew(Token.java:458)
  at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$1.run(DelegationTokenRenewer.java:601)
  at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$1.run(DelegationTokenRenewer.java:598)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)
  at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.renewToken(DelegationTokenRenewer.java:597)
  at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$RenewalTimerTask.run(DelegationTokenRenewer.java:531)
  at java.util.TimerThread.mainLoop(Timer.java:555)
  at java.util.TimerThread.run(Timer.java:505)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): hadoop tried to renew an expired token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) max expiration date: 2022-07-11 17:11:13,393+0800 currentTime: 2022-07-11 17:11:13,394+0800
  at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:499)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:5952)
  at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:675)
  at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1035)
  at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
  at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
  at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)
  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)

  at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1486)
  at org.apache.hadoop.ipc.Client.call(Client.java:1432)
  at org.apache.hadoop.ipc.Client.call(Client.java:1342)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
  at com.sun.proxy.$Proxy94.renewDelegationToken(Unknown Source)
  at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewDelegationToken(ClientNamenodeProtocolTranslatorPB.java:964)
  at sun.reflect.GeneratedMethodAccessor277.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:411)
  at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
  at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
  at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
  at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:348)
  at com.sun.proxy.$Proxy95.renewDelegationToken(Unknown Source)
  at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:759)
  ... 10 more
2022-07-11 17:11:13,399 ERROR org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: removing failed delegation token for appid=[application_1637733238080_3800];t=ha-hdfs:hdfsHACluster

從上面的日誌可以看到,任務從提交後,delegation token每天都有在更新,然而執行到第7天后,更新失敗而失效。失效後,NN內部會刪除無效的token,此時如果任務失敗需要重試,或者任務結束需要進行日誌聚合,都會繼續使用該無效的token來操作hdfs,最終結果就是在NN中找不到對應的token而拋異常導致失敗。

 

【問題解決】


要解決該問題,一種最簡單直接的辦法就是加大delegation token的最大生命週期時間。

但一開始覺得該辦法略有些low,尤其對於flink長週期執行的實時任務的場景,是無法確定任務的執行時長的,因此也就無法確定設定token的最大生命週期。

因此,再次分析了原始碼,發現RM中對於將要過期(超過最大生命週期)的delegation token,會按需重新申請一個新的token,也就是定時器執行緒中token更新之前的requestNewHdfsDelegationTokenIfNeeded方法。

來看看具體的實現邏輯:

private void requestNewHdfsDelegationTokenIfNeeded(
    final DelegationTokenToRenew dttr) 
    throws IOException, InterruptedException {

    // 擁有特權 並且 token型別為委派token 並且 快到最大生命週期
    if (hasProxyUserPrivileges &&
        dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining &&
        dttr.token.getKind().equals(HDFS_DELEGATION_KIND)) {

        final Collection<ApplicationId> applicationIds;
        synchronized (dttr.referringAppIds) {
            applicationIds = new HashSet<>(dttr.referringAppIds);
            dttr.referringAppIds.clear();
        }
        // remove all old expiring hdfs tokens for this application.
        for (ApplicationId appId : applicationIds) {
            Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);
            if (tokenSet == null || tokenSet.isEmpty()) {
                continue;
            }
            Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
            synchronized (tokenSet) {
                while (iter.hasNext()) {
                    DelegationTokenToRenew t = iter.next();
                    if (t.token.getKind().equals(HDFS_DELEGATION_KIND)) {
                        iter.remove();
                        allTokens.remove(t.token);
                        t.cancelTimer();
                        LOG.info("Removed expiring token " + t);
                    }
                }
            }
        }
        LOG.info("Token= (" + dttr + ") is expiring, request new token.");
        requestNewHdfsDelegationTokenAsProxyUser(
            applicationIds, dttr.user,
            dttr.shouldCancelAtEnd);
    }
}

申請到新的token之後,會在RM內部進行更新,然後通過NM的心跳響應同步給NM。

private void requestNewHdfsDelegationTokenAsProxyUser(
    ...
    // Get new hdfs tokens for this user
    Credentials credentials = new Credentials();
    Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);
    ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
    for (ApplicationId applicationId : referringAppIds) {
        // 更新app的delegation token
        // 在NM心跳時進行同步
        rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
    }
} 

public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    throws YarnException, IOException {
    ...
    ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
        rmContext.getSystemCredentialsForApps();
    if (!systemCredentials.isEmpty()) {
        nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
    }
    ...
}

NM在心跳響應中解析出token並在記憶體中更新儲存,後續任務重試啟動資源本地化和任務結束觸發日誌聚合時會使用到。

注意:這裡只提到了資源本地化和日誌聚合時會使用到更新後的token,那麼正在執行的任務會用到更新後的token嗎?

答案是不會(至少是2.X版本不會)。主要是因為:token已經寫入到持久化檔案中,任務啟動時讀取該檔案獲取token並使用;delegation token在更新後沒有寫入到持久化檔案中,即使可以寫入(更新)到該檔案,也需要有機制通知任務程序更新讀取該檔案才行。因此正在執行中的任務在token過期後繼續操作hdfs仍舊會丟擲異常。

另外,在3.X的最新版本中,注意到有相關程式碼的改動,應該是通知正在執行的container,但具體細節還未深入研究,後面有時間再調研。

【相關配置】


與delegation token相關的配置包括:

配置項名稱 預設值 說明
dfs.namenode.delegation.key.update-interval 1天 token更新金鑰的時間間隔
dfs.namenode.delegation.token.renew-interval 1天 token更新的時間間隔
dfs.namenode.delegation.token.max-lifetime 7天 token的最大生命週期
yarn.resourcemanager.delegation-token.alwys-cancel false RM結束時是否需要移除token
yarn.resourcemanager.proxy-user-privileges.enabled false 是否開啟特權在delegation token快過期時重新申請新的token
yarn.resourcemanager.system-credentials.valid-time-remaining 10800000 距離最大生命週期之前多長時間進行重新申請token的操作,單位毫秒
yarn.resourcemanager.delegation-token-renewer.thread-count 50 RM中delegation token更新執行緒的執行緒數

【總結】


本文通過一個實際的問題,並結合原始碼講解了hadoop的delegation token的相關原理。

文中如有不對的地方,歡迎拍磚指正。

 

好了,這就是本文的全部內容,如果覺得本文對您有幫助,不要吝嗇點贊在看轉發,也歡迎加我微信交流~

 

本文分享自微信公眾號 - hncscwc(gh_383bc7486c1a)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。