当前位置:   article > 正文

一文讲透hdfs的delegation token

yarn tries to renew a token with non-matching renewer rm

【背景】


前一段时间总结了hadoop中的token认证、yarn任务运行中的token,其中也都提到了delegation token。而最近也遇到了一个问题,问题现象是:flink任务运行超过七天后,由于宿主机异常导致任务失败,继而触发任务的重试,但接连重试几次都是失败的,并且任务的日志也没有聚合,导致无法分析问题失败的原因。最后发现是和delegation token有关,本文就来总结下相关的原理。

【原理】


1. 什么是delegation token

先简单描述下为什么需要delegation token。在开启kerberos之后,服务之间交互前,都需要先向KDC认证获取对应的票据。而在一个yarn任务运行过程中可能会产生很多任务container,每个这样的任务container都可能会访问hdfs,由于访问前需要先获取票据来进行认证,那么这个时候KDC就很容易成为性能瓶颈。delegation token(委派token)就是为了减少不必要的认证工作而出现的。

2. delegation token在任务提交运行过程中的使用

任务提交运行过程中,delegation token相关的流程如下图所示:

12c75956e370a85b4fb61c708f86a1c9.jpeg

1)首先,RM启动后,内部会创建一个服务线程专门用于处理token的更新

  1. // ResourceManager.java
  2. protected void serviceInit(Configuration configuration) throws Exception {
  3.     ...
  4.     if (UserGroupInformation.isSecurityEnabled()) {
  5.         delegationTokenRenewer = createDelegationTokenRenewer();
  6.         rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
  7.     }
  8.     ....
  9. }
  10. protected DelegationTokenRenewer createDelegationTokenRenewer() {
  11.     return new DelegationTokenRenewer();
  12. }

2)客户端申请delegation token

客户端在提交任务前,通常需要先向hdfs上传资源文件(包括运行所需的jar包等),在此过程中会向nn申请一个delegation token,并放到任务启动上下文中,然后向rm发送提交任务请求(请求中包含任务的启动上下文)。

下面是flink on yarn提交任务时的代码片段:

  1. // flink YarnClusterDescriptor.java
  2. private ApplicationReport startAppMaster(...){
  3.     // 开启kerberos的情况下,获取token
  4.     if (UserGroupInformation.isSecurityEnabled()) {
  5.       // set HDFS delegation tokens when security is enabled
  6.       LOG.info("Adding delegation token to the AM container.");
  7.       List<Path> yarnAccessList =
  8.         ConfigUtils.decodeListFromConfig(
  9.           configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
  10.       Utils.setTokensFor(
  11.         amContainer,
  12.         ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
  13.         yarnConfiguration);
  14.     }
  15. }
  16. public static void setTokensFor(
  17.     ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
  18.     throws IOException {
  19.     Credentials credentials = new Credentials();
  20.     // for HDFS
  21.     TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
  22.     // for HBase
  23.     obtainTokenForHBase(credentials, conf);
  24.     // for user
  25.     UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
  26.     // 获取到的token 放到启动上下文中
  27.     Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
  28.     for (Token<? extends TokenIdentifier> token : usrTok) {
  29.         final Text id = new Text(token.getIdentifier());
  30.         LOG.info("Adding user token " + id + " with " + token);
  31.         credentials.addToken(id, token);
  32.     }
  33.     try (DataOutputBuffer dob = new DataOutputBuffer()) {
  34.         credentials.writeTokenStorageToStream(dob);
  35.         if (LOG.isDebugEnabled()) {
  36.             LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
  37.         }
  38.         ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
  39.         amContainer.setTokens(securityTokens);
  40.     }
  41. }
  42. // TokenCache.java
  43. // 调用hadoop的接口 向nn请求token
  44. public static void obtainTokensForNamenodes(
  45.     Credentials credentials,
  46.     Path[] ps, Configuration conf) 
  47.     throws IOException {
  48.     if (!UserGroupInformation.isSecurityEnabled()) {
  49.         return;
  50.     }
  51.     obtainTokensForNamenodesInternal(credentials, ps, conf);
  52. }
  53. static void obtainTokensForNamenodesInternal(
  54.     Credentials credentials,
  55.     Path[] ps,
  56.     Configuration conf) 
  57.     throws IOException {
  58.     Set<FileSystem> fsSet = new HashSet<FileSystem>();
  59.     for (Path p : ps) {
  60.         fsSet.add(p.getFileSystem(conf));
  61.     }
  62.     String masterPrincipal = Master.getMasterPrincipal(conf);
  63.     for (FileSystem fs : fsSet) {
  64.         obtainTokensForNamenodesInternal(fs, credentials, conf, masterPrincipal);
  65.     }
  66. }
  67. static void obtainTokensForNamenodesInternal(
  68.     FileSystem fs,
  69.     Credentials credentials,
  70.     Configuration conf,
  71.     String renewer)
  72.     throws IOException {
  73.     ...
  74.     final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, credentials);
  75.     ...
  76. }
  77. // FileSystem.java
  78. public Token<?>[] addDelegationTokens(
  79.     final String renewer, Credentials credentials)
  80.     throws IOException {
  81.     if (credentials == null) {
  82.         credentials = new Credentials();
  83.     }
  84.     final List<Token<?>> tokens = new ArrayList<>();
  85.     collectDelegationTokens(renewer, credentials, tokens);
  86.     return tokens.toArray(new Token<?>[tokens.size()]);
  87. }
  88. private void collectDelegationTokens(
  89.     final String renewer,
  90.     final Credentials credentials,
  91.     final List<Token<?>> tokens)
  92.     throws IOException {
  93.     final String serviceName = getCanonicalServiceName();
  94.     // Collect token of the this filesystem and then of its embedded children
  95.     if (serviceName != null) { // fs has token, grab it
  96.         final Text service = new Text(serviceName);
  97.         Token<?> token = credentials.getToken(service);
  98.         if (token == null) {
  99.             // 向NN 请求delegation token
  100.             token = getDelegationToken(renewer);
  101.             if (token != null) {
  102.                 tokens.add(token);
  103.                 credentials.addToken(service, token);
  104.             }
  105.         }
  106.     }
  107.     ...
  108. }

3)RM将token添加到delegation token更新服务中

RM在处理客户端提交任务请求时,判断是否启用kerberos认证,如果启用则从任务启动上下文中解析出delegation token,并添加到delegation token更新服务中。在该服务中,会启动线程定时对delegation token进行更新。此后,继续向NM发送启动container的请求,delegation token则随启动上下文被带到NM中。

  1. // RMAppManager.java
  2. protected void submitApplication(
  3.     ApplicationSubmissionContext submissionContext,
  4.     long submitTime,
  5.     String user)
  6.     throws YarnException {
  7.     ...
  8.     if (UserGroupInformation.isSecurityEnabled()) {
  9.         this.rmContext.getDelegationTokenRenewer().addApplicationAsync(
  10.             applicationId,
  11.             BuilderUtils.parseCredentials(submissionContext),
  12.             submissionContext.getCancelTokensWhenComplete(),
  13.             application.getUser(),
  14.             BuilderUtils.parseTokensConf(submissionContext));
  15.     }
  16.     ...
  17. }

4)NM使用delegation token

NM收到启动container的请求后,从请求(任务启动上下文)中解析出delegation token,并为该container构造一个对应的实例对象,同时将delegation token保存在该实例对象中,然后为该container进行资源本地化,即从hdfs中下载必须的资源文件,这里就会用到传递过来的delegation token。同时在任务结束时,如果需要进行任务日志聚合,仍旧会使用该delegation token将任务的日志上传到hdfs的指定路径。

另外,delegation token还会写入到持久化文件中,一方面用于NM的异常恢复,另一方面是将token传递给任务container进程以供使用。

3. delegation token的更新与生命周期

1)申请token时已经指定了token的最大生命周期

  1. // FSNamesystem.java
  2. Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException {
  3.     ...
  4.     DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, renewer, realUser);
  5.     token = new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
  6.     ...
  7.     return token;
  8. }
  9. // Token.java
  10. public Token(T id, SecretManager<T> mgr) {
  11.     password = mgr.createPassword(id);
  12.     identifier = id.getBytes();
  13.     kind = id.getKind();
  14.     service = new Text();
  15. }
  16. // AbstractDelegationTokenSecretManager
  17. protected synchronized byte[] createPassword(TokenIdent identifier) {
  18.     long now = Time.now();
  19.     identifier.setMaxDate(now + tokenMaxLifetime);
  20.     ...
  21. }

2)RM接收到任务提交请求后,先进行一次更新得到token的下次超时时间,然后再根据超时时间设置定时器时间触发进行更新。

  1. public void addApplicationSync(
  2.     ApplicationId applicationId,
  3.     Credentials ts,
  4.     boolean shouldCancelAtEnd,
  5.     String user) 
  6.     throws IOException, InterruptedException {
  7.     handleAppSubmitEvent(
  8.         new DelegationTokenRenewerAppSubmitEvent(
  9.             applicationId, ts, shouldCancelAtEnd, user, new Configuration()));
  10. }
  11. private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
  12.     throws IOException, InterruptedException {
  13.     ...
  14.     Credentials ts = evt.getCredentials();
  15.     Collection<Token<?>> tokens = ts.getAllTokens();
  16.     for (Token<?> token : tokens) {
  17.         DelegationTokenToRenew dttr = allTokens.get(token);
  18.         if (dttr == null) {
  19.             dttr = new DelegationTokenToRenew(
  20.                 Arrays.asList(applicationId),
  21.                 token, tokenConf, now, shouldCancelAtEnd,
  22.                 evt.getUser());
  23.             try {
  24.                 // 先进行一次更新
  25.                 renewToken(dttr)
  26.             } catch (IOException ioe) {
  27.                 ...
  28.             }
  29.         }
  30.         tokenList.add(dttr);
  31.     }
  32.     
  33.     if (!tokenList.isEmpty()) {
  34.         for (DelegationTokenToRenew dtr : tokenList) {
  35.             DelegationTokenToRenew currentDtr = allTokens.putIfAbsent(dtr.token, dtr);
  36.             if (currentDtr != null) {
  37.                 // another job beat us
  38.                 currentDtr.referringAppIds.add(applicationId);
  39.                 appTokens.get(applicationId).add(currentDtr);
  40.             } else {
  41.                 appTokens.get(applicationId).add(dtr);
  42.                 setTimerForTokenRenewal(dtr);
  43.             }
  44.         }
  45.     }
  46. }
  47. protected void renewToken(final DelegationTokenToRenew dttr)
  48.     throws IOException {
  49.     // need to use doAs so that http can find the kerberos tgt
  50.     // NOTE: token renewers should be responsible for the correct UGI!
  51.     try {
  52.         // 更新delegation token 并得到下次超时时间
  53.         dttr.expirationDate =
  54.             UserGroupInformation.getLoginUser().doAs(
  55.                 new PrivilegedExceptionAction<Long>() {
  56.                     @Override
  57.                     public Long run() throws Exception {
  58.                         return dttr.token.renew(dttr.conf);
  59.                     }
  60.                 });
  61.     } catch (InterruptedException e) {
  62.         throw new IOException(e);
  63.     }
  64.     LOG.info("Renewed delegation-token= [" + dttr + "]");
  65. }
  66. protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
  67.     throws IOException {
  68.     // calculate timer time
  69.     long expiresIn = token.expirationDate - System.currentTimeMillis();
  70.     if (expiresIn <= 0) {
  71.         LOG.info("Will not renew token " + token);
  72.         return;
  73.     }
  74.     long renewIn = token.expirationDate - expiresIn / 10; // little bit before the expiration
  75.     // need to create new task every time
  76.     RenewalTimerTask tTask = new RenewalTimerTask(token);
  77.     token.setTimerTask(tTask); // keep reference to the timer
  78.     renewalTimer.schedule(token.timerTask, new Date(renewIn));
  79.     LOG.info(
  80.         "Renew " + token + " in " + expiresIn + " ms, appId = " +
  81.         token.referringAppIds);
  82. }

再来看更新token的请求与处理细节:

  1. // 客户端发送更新请求
  2. public long renew(Token<?> token, Configuration conf) throws IOException {
  3.     Token<DelegationTokenIdentifier> delToken = (Token<DelegationTokenIdentifier>) token;
  4.     ClientProtocol nn = getNNProxy(delToken, conf);
  5.     try {
  6.         return nn.renewDelegationToken(delToken);
  7.     } catch (RemoteException re) {
  8.         throw re.unwrapRemoteException(InvalidToken.class,
  9.                 AccessControlException.class);
  10.     }
  11. }
  12. // 服务端的响应处理
  13. long renewDelegationToken(Token<DelegationTokenIdentifier> token)
  14.     throws InvalidToken, IOException {
  15.     try {
  16.         ...
  17.         expiryTime = dtSecretManager.renewToken(token, renewer);
  18.     } catch (AccessControlException ace) {
  19.         ...
  20.     }
  21.     return expiryTime;
  22. }
  23. public synchronized long renewToken(
  24.     Token<TokenIdent> token,
  25.     String renewer) 
  26.     throws InvalidToken, IOException {
  27.     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
  28.     DataInputStream in = new DataInputStream(buf);
  29.     TokenIdent id = createIdentifier();
  30.     id.readFields(in);
  31.     LOG.info(
  32.         "Token renewal for identifier: " + formatTokenId(id) +
  33.         "; total currentTokens " + currentTokens.size());
  34.     long now = Time.now();
  35.     if (id.getMaxDate() < now) {
  36.         throw new InvalidToken(
  37.             renewer + " tried to renew an expired token " +
  38.             formatTokenId(id) + " max expiration date: " +
  39.             Time.formatTime(id.getMaxDate()) +
  40.             " currentTime: " + Time.formatTime(now));
  41.     }
  42.     if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) {
  43.         throw new AccessControlException(
  44.             renewer +
  45.             " tried to renew a token " + formatTokenId(id) +
  46.             " without a renewer");
  47.     }
  48.     if (!id.getRenewer().toString().equals(renewer)) {
  49.         throw new AccessControlException(
  50.             renewer +
  51.             " tries to renew a token " + formatTokenId(id) +
  52.             " with non-matching renewer " + id.getRenewer());
  53.     }
  54.     DelegationKey key = getDelegationKey(id.getMasterKeyId());
  55.     if (key == null) {
  56.         throw new InvalidToken(
  57.             "Unable to find master key for keyId=" +
  58.             id.getMasterKeyId() +
  59.             " from cache. Failed to renew an unexpired token " +
  60.             formatTokenId(id) + " with sequenceNumber=" +
  61.             id.getSequenceNumber());
  62.     }
  63.     byte[] password = createPassword(token.getIdentifier(), key.getKey());
  64.     if (!MessageDigest.isEqual(password, token.getPassword())) {
  65.         throw new AccessControlException(
  66.             renewer +
  67.             " is trying to renew a token " +
  68.             formatTokenId(id) + " with wrong password");
  69.     }
  70.     long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
  71.     String trackingId = getTrackingIdIfEnabled(id);
  72.     DelegationTokenInformation info =
  73.         new DelegationTokenInformation(renewTime, password, trackingId);
  74.     if (getTokenInfo(id) == null) {
  75.         throw new InvalidToken(
  76.             "Renewal request for unknown token " + formatTokenId(id));
  77.     }
  78.     updateToken(id, info);
  79.     return renewTime;
  80. }

3)token达到最大生命周期的处理

在定时器中,会捕获更新抛出的异常,并直接移除失效的token。

但是注意:在每次更新之前,会按需重新申请新的delegation token(后面再展开讲解)

  1. public void run() {
  2.     if (cancelled.get()) {
  3.         return;
  4.     }
  5.     Token<?> token = dttr.token;
  6.     try {
  7.         // 先判断是否需要申请新的token
  8.         requestNewHdfsDelegationTokenIfNeeded(dttr);
  9.         // if the token is not replaced by a new token, renew the token
  10.         if (!dttr.isTimerCancelled()) {
  11.             renewToken(dttr);
  12.             setTimerForTokenRenewal(dttr);// set the next one
  13.         } else {
  14.             LOG.info("The token was removed already. Token = [" + dttr + "]");
  15.         }
  16.     } catch (Exception e) {
  17.         LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
  18.         removeFailedDelegationToken(dttr);
  19.     }
  20. }

【问题分析】


来看看前面问题失败的相关日志,复盘分析下。

首先从NM的日志中发现任务在重试时,因为无法下载资源(到本地)导致无法启动任务,而下载资源失败的原因则是因为无效的token。

  1. 2022-07-18 13:44:18,665 WARN org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache
  2. 2022-07-18 13:44:18,669 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService: { hdfs://hdfsHACluster/user/hncscwc/.flink/application_1637733238080_3800/application_1637733238080_38002636034628721129021.tmp, 1656925873322, FILE, null } failed: token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache
  3. org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache
  4.   at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1486)
  5.   at org.apache.hadoop.ipc.Client.call(Client.java:1432)
  6.   at org.apache.hadoop.ipc.Client.call(Client.java:1342)
  7.   at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
  8.   at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
  9.   at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
  10.   at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:796)
  11.   at sun.reflect.GeneratedMethodAccessor172.invoke(Unknown Source)
  12.   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  13.   at java.lang.reflect.Method.invoke(Method.java:498)
  14.   at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:411)
  15.   at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
  16.   at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
  17.   at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
  18.   at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:348)
  19.   at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
  20.   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1649)
  21.   at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1440)
  22.   at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1437)
  23.   at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  24.   at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1452)
  25.   at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
  26.   at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
  27.   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
  28.   at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
  29.   at java.security.AccessController.doPrivileged(Native Method)
  30.   at javax.security.auth.Subject.doAs(Subject.java:422)
  31.   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)
  32.   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)
  33.   at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
  34.   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  35.   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  36.   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  37.   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  38.   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  39.   at java.lang.Thread.run(Thread.java:748)

为什么会出现无效的token,接着再看RM的日志。

  1. 2022-07-04 17:11:13,400 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler: Application 'application_1637733238080_3800' is submitted without priority hence considering default queue/cluster priority: 0
  2. 2022-07-04 17:11:13,424 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657012273422; apps=[application_1637733238080_3800]]
  3. 2022-07-05 14:47:13,462 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657090033446; apps=[application_1637733238080_3800]]
  4. 2022-07-06 12:23:13,467 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657167793465; apps=[application_1637733238080_3800]]
  5. 2022-07-07 09:59:13,487 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657245553484; apps=[application_1637733238080_3800]]
  6. 2022-07-08 07:35:13,532 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657323313511; apps=[application_1637733238080_3800]]
  7. 2022-07-09 05:11:13,551 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657401073532; apps=[application_1637733238080_3800]]
  8. 2022-07-10 02:47:13,564 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657478833547; apps=[application_1637733238080_3800]]
  9. 2022-07-11 00:23:13,591 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
  10. 2022-07-11 17:11:07,361 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
  11. 2022-07-11 17:11:07,361 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 6032 ms, appId = [application_1637733238080_3800]
  12. 2022-07-11 17:11:12,793 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
  13. 2022-07-11 17:11:12,793 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 600 ms, appId = [application_1637733238080_3800]
  14. 2022-07-11 17:11:13,337 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
  15. 2022-07-11 17:11:13,337 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 56 ms, appId = [application_1637733238080_3800]
  16. 2022-07-11 17:11:13,391 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
  17. 2022-07-11 17:11:13,391 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 2 ms, appId = [application_1637733238080_3800]
  18. 2022-07-11 17:11:13,398 ERROR org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Exception renewing tokenKind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc). Not rescheduled
  19. org.apache.hadoop.security.token.SecretManager$InvalidToken: hadoop tried to renew an expired token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) max expiration date: 2022-07-11 17:11:13,393+0800 currentTime: 2022-07-11 17:11:13,394+0800
  20.   at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:499)
  21.   at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:5952)
  22.   at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:675)
  23.   at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1035)
  24.   at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  25.   at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
  26.   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
  27.   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
  28.   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
  29.   at java.security.AccessController.doPrivileged(Native Method)
  30.   at javax.security.auth.Subject.doAs(Subject.java:422)
  31.   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)
  32.   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)
  33.   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  34.   at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  35.   at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  36.   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  37.   at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
  38.   at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
  39.   at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:761)
  40.   at org.apache.hadoop.security.token.Token.renew(Token.java:458)
  41.   at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$1.run(DelegationTokenRenewer.java:601)
  42.   at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$1.run(DelegationTokenRenewer.java:598)
  43.   at java.security.AccessController.doPrivileged(Native Method)
  44.   at javax.security.auth.Subject.doAs(Subject.java:422)
  45.   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)
  46.   at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.renewToken(DelegationTokenRenewer.java:597)
  47.   at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$RenewalTimerTask.run(DelegationTokenRenewer.java:531)
  48.   at java.util.TimerThread.mainLoop(Timer.java:555)
  49.   at java.util.TimerThread.run(Timer.java:505)
  50. Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): hadoop tried to renew an expired token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) max expiration date: 2022-07-11 17:11:13,393+0800 currentTime: 2022-07-11 17:11:13,394+0800
  51.   at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:499)
  52.   at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:5952)
  53.   at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:675)
  54.   at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1035)
  55.   at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  56.   at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
  57.   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
  58.   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
  59.   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
  60.   at java.security.AccessController.doPrivileged(Native Method)
  61.   at javax.security.auth.Subject.doAs(Subject.java:422)
  62.   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)
  63.   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)
  64.   at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1486)
  65.   at org.apache.hadoop.ipc.Client.call(Client.java:1432)
  66.   at org.apache.hadoop.ipc.Client.call(Client.java:1342)
  67.   at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
  68.   at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
  69.   at com.sun.proxy.$Proxy94.renewDelegationToken(Unknown Source)
  70.   at 
  71. org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewDelegationToken(ClientNamenodeProtocolTranslatorPB.java:964)
  72.   at sun.reflect.GeneratedMethodAccessor277.invoke(Unknown Source)
  73.   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  74.   at java.lang.reflect.Method.invoke(Method.java:498)
  75.   at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:411)
  76.   at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
  77.   at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
  78.   at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
  79.   at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:348)
  80.   at com.sun.proxy.$Proxy95.renewDelegationToken(Unknown Source)
  81.   at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:759)
  82.   ... 10 more
  83. 2022-07-11 17:11:13,399 ERROR org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: removing failed delegation token for appid=[application_1637733238080_3800];t=ha-hdfs:hdfsHACluster

从上面的日志可以看到,任务从提交后,delegation token每天都有在更新,然而运行到第7天后,更新失败而失效。失效后,NN内部会删除无效的token,此时如果任务失败需要重试,或者任务结束需要进行日志聚合,都会继续使用该无效的token来操作hdfs,最终结果就是在NN中找不到对应的token而抛异常导致失败。

【问题解决】


要解决该问题,一种最简单直接的办法就是加大delegation token的最大生命周期时间。

但一开始觉得该办法略有些low,尤其对于flink长周期运行的实时任务的场景,是无法确定任务的运行时长的,因此也就无法确定设置token的最大生命周期。

因此,再次分析了源码,发现RM中对于将要过期(超过最大生命周期)的delegation token,会按需重新申请一个新的token,也就是定时器线程中token更新之前的requestNewHdfsDelegationTokenIfNeeded方法。

来看看具体的实现逻辑:

  1. private void requestNewHdfsDelegationTokenIfNeeded(
  2.     final DelegationTokenToRenew dttr) 
  3.     throws IOException, InterruptedException {
  4.     // 拥有特权 并且 token类型为委派token 并且 快到最大生命周期
  5.     if (hasProxyUserPrivileges &&
  6.         dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining &&
  7.         dttr.token.getKind().equals(HDFS_DELEGATION_KIND)) {
  8.         final Collection<ApplicationId> applicationIds;
  9.         synchronized (dttr.referringAppIds) {
  10.             applicationIds = new HashSet<>(dttr.referringAppIds);
  11.             dttr.referringAppIds.clear();
  12.         }
  13.         // remove all old expiring hdfs tokens for this application.
  14.         for (ApplicationId appId : applicationIds) {
  15.             Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);
  16.             if (tokenSet == null || tokenSet.isEmpty()) {
  17.                 continue;
  18.             }
  19.             Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
  20.             synchronized (tokenSet) {
  21.                 while (iter.hasNext()) {
  22.                     DelegationTokenToRenew t = iter.next();
  23.                     if (t.token.getKind().equals(HDFS_DELEGATION_KIND)) {
  24.                         iter.remove();
  25.                         allTokens.remove(t.token);
  26.                         t.cancelTimer();
  27.                         LOG.info("Removed expiring token " + t);
  28.                     }
  29.                 }
  30.             }
  31.         }
  32.         LOG.info("Token= (" + dttr + ") is expiring, request new token.");
  33.         requestNewHdfsDelegationTokenAsProxyUser(
  34.             applicationIds, dttr.user,
  35.             dttr.shouldCancelAtEnd);
  36.     }
  37. }

申请到新的token之后,会在RM内部进行更新,然后通过NM的心跳响应同步给NM。

  1. private void requestNewHdfsDelegationTokenAsProxyUser(
  2.     ...
  3.     // Get new hdfs tokens for this user
  4.     Credentials credentials = new Credentials();
  5.     Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
  6.     DataOutputBuffer dob = new DataOutputBuffer();
  7.     credentials.writeTokenStorageToStream(dob);
  8.     ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
  9.     for (ApplicationId applicationId : referringAppIds) {
  10.         // 更新app的delegation token
  11.         // 在NM心跳时进行同步
  12.         rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
  13.     }
  14. }
  15. public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
  16.     throws YarnException, IOException {
  17.     ...
  18.     ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
  19.         rmContext.getSystemCredentialsForApps();
  20.     if (!systemCredentials.isEmpty()) {
  21.         nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
  22.     }
  23.     ...
  24. }

NM在心跳响应中解析出token并在内存中更新保存,后续任务重试启动资源本地化和任务结束触发日志聚合时会使用到。

注意:这里只提到了资源本地化和日志聚合时会使用到更新后的token,那么正在运行的任务会用到更新后的token吗?

答案是不会(至少是2.X版本不会)。主要是因为:token已经写入到持久化文件中,任务启动时读取该文件获取token并使用;delegation token在更新后没有写入到持久化文件中,即使可以写入(更新)到该文件,也需要有机制通知任务进程更新读取该文件才行。因此正在运行中的任务在token过期后继续操作hdfs仍旧会抛出异常。

另外,在3.X的最新版本中,注意到有相关代码的改动,应该是通知正在运行的container,但具体细节还未深入研究,后面有时间再调研。

【相关配置】


与delegation token相关的配置包括:

配置项名称默认值说明
dfs.namenode.delegation.key.update-interval1天token更新密钥的时间间隔
dfs.namenode.delegation.token.renew-interval1天token更新的时间间隔
dfs.namenode.delegation.token.max-lifetime7天token的最大生命周期
yarn.resourcemanager.delegation-token.alwys-cancelfalseRM结束时是否需要移除token
yarn.resourcemanager.proxy-user-privileges.enabledfalse是否开启特权在delegation token快过期时重新申请新的token
yarn.resourcemanager.system-credentials.valid-time-remaining10800000距离最大生命周期之前多长时间进行重新申请token的操作,单位毫秒
yarn.resourcemanager.delegation-token-renewer.thread-count50RM中delegation token更新线程的线程数

【总结】


本文通过一个实际的问题,并结合源码讲解了hadoop的delegation token的相关原理。

文中如有不对的地方,欢迎拍砖指正。

好了,这就是本文的全部内容,如果觉得本文对您有帮助,不要吝啬点赞在看转发,也欢迎加我微信交流~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/265846
推荐阅读
  

闽ICP备14008679号