hdfs命令的那點事

背景

時間:2021年02月26日13:58:26java

簡要步驟:node

  1. 肯定命令是什麼?
  2. 肯定命令的位置是是哪裏?
  3. 肯定命令執行的主類是哪個?
  4. 按照執行流程查看

例子:
發現hdfs dfsadmin -report存儲指標和hdfs dfs -du -h /結果不一致,須要查看兩張的統計邏輯的區別shell

  1. 肯定命令的位置,which is hdfs

[ops@m-onedata bin]$ which is hdfs
/usr/bin/hdfsapache

  1. 查看腳本,cat /usr/bin/hdfs

exec /usr/.../hadoop-hdfs/bin/hdfs.distro "$@"ide

發現實際執行的腳本位置,繼續查看執行腳本oop

elif [ "$COMMAND" = "dfs" ] ; then
CLASS=org.apache.hadoop.fs.FsShell
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "dfsadmin" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "haadmin" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.DFSHAAdmin
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "fsck" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.DFSck
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "balancer" ] ; then
CLASS=org.apache.hadoop.hdfs.server.balancer.Balancer
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_BALANCER_OPTS"ui

exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"this

  1. 找到實際執行的主類FsShell和DFSAdmin
  2. 查看執行的流程

發現hdfs dfsadmin -report存儲指標和hdfs dfs -du -h /結果不一致,須要查看兩張的統計邏輯的區別spa

[hadoop@123-onedata ~]# hdfs dfsadmin -report
Configured Capacity: 157902726414336 (143.61 TB)
Present Capacity: 143526582003116 (130.54 TB)
DFS Remaining: 18651259047864 (16.96 TB)
DFS Used: 124875322955252 (113.57 TB)
DFS Used%: 87.01%
Under replicated blocks: 209619
Blocks with corrupt replicas: 0
Missing blocks: 209619
Missing blocks (with replication factor 1): 0
[hadoop@123-onedata ~]$ hdfs dfs -du -h /
33.8 G   /mr-history
74.9 M   /spark2-history
18.6 T   /user

排查流程

找到實際執行的主類FsShell和DFSAdmindebug

hdfs dfs -du

FsShell

/**
   * main() has some simple utility methods
   * @param argv the command and its arguments
   * @throws Exception upon error
   */
  public static void main(String argv[]) throws Exception {
    FsShell shell = newShellInstance();// 建立一個FsShell對象
    Configuration conf = new Configuration();
    conf.setQuietMode(false);
    shell.setConf(conf);
    int res;
    try {
      res = ToolRunner.run(shell, argv);// 執行FsShell的run方法
    } finally {
      shell.close();
    }
    System.exit(res);
  }
// -------------------------------分割線---------------------------------------
  /**
   * run
   */
  @Override
  public int run(String[] argv) {
    // initialize FsShell
    init();
    Tracer tracer = new Tracer.Builder("FsShell").
        conf(TraceUtils.wrapHadoopConf(SHELL_HTRACE_PREFIX, getConf())).
        build();
    int exitCode = -1;
    if (argv.length < 1) {
      printUsage(System.err);
    } else {
      String cmd = argv[0];
      Command instance = null;
      try {
        instance = commandFactory.getInstance(cmd);// 根據參數建立對應的Command對象
        if (instance == null) {
          throw new UnknownCommandException();
        }
        TraceScope scope = tracer.newScope(instance.getCommandName());
        if (scope.getSpan() != null) {
          String args = StringUtils.join(" ", argv);
          if (args.length() > 2048) {
            args = args.substring(0, 2048);
          }
          scope.getSpan().addKVAnnotation("args", args);
        }
        try {
          exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));// 執行命令
        } finally {
          scope.close();
        }
      } catch (IllegalArgumentException e) {
        if (e.getMessage() == null) {
          displayError(cmd, "Null exception message");
          e.printStackTrace(System.err);
        } else {
          displayError(cmd, e.getLocalizedMessage());
        }
        printUsage(System.err);
        if (instance != null) {
          printInstanceUsage(System.err, instance);
        }
      } catch (Exception e) {
        // instance.run catches IOE, so something is REALLY wrong if here
        LOG.debug("Error", e);
        displayError(cmd, "Fatal internal error");
        e.printStackTrace(System.err);
      }
    }
    tracer.close();
    return exitCode;
  }

CommandFactory

/**
   * Get an instance of the requested command
   * @param cmdName name of the command to lookup
   * @param conf the hadoop configuration
   * @return the {@link Command} or null if the command is unknown
   */
  public Command getInstance(String cmdName, Configuration conf) {
    if (conf == null) throw new NullPointerException("configuration is null");
    
    Command instance = objectMap.get(cmdName);// 執行的時候添加的命令
    if (instance == null) {
      Class<? extends Command> cmdClass = classMap.get(cmdName);// 若是沒有那麼按照命令執行反射去建立命令的對象
      if (cmdClass != null) {
        instance = ReflectionUtils.newInstance(cmdClass, conf);
        instance.setName(cmdName);
        instance.setCommandFactory(this);
      }
    }
    return instance;
  }

既然是執行對應命令的command對象,那麼就找到對應的對象便可,查看Command的繼承樹以下:

DFSAdminCommand in DFSAdmin (org.apache.hadoop.hdfs.tools)
    SetSpaceQuotaCommand in DFSAdmin (org.apache.hadoop.hdfs.tools)
    ClearSpaceQuotaCommand in DFSAdmin (org.apache.hadoop.hdfs.tools)
    SetQuotaCommand in DFSAdmin (org.apache.hadoop.hdfs.tools)
    ClearQuotaCommand in DFSAdmin (org.apache.hadoop.hdfs.tools)
FsCommand (org.apache.hadoop.fs.shell)
    Stat (org.apache.hadoop.fs.shell)
    SetfaclCommand in AclCommands (org.apache.hadoop.fs.shell)
    Mkdir (org.apache.hadoop.fs.shell)
    Display (org.apache.hadoop.fs.shell)
    FsShellPermissions (org.apache.hadoop.fs)
    Help in FsShell (org.apache.hadoop.fs)
    Truncate (org.apache.hadoop.fs.shell)
    Ls (org.apache.hadoop.fs.shell)
    Usage in FsShell (org.apache.hadoop.fs)
    Rmdir in Delete (org.apache.hadoop.fs.shell)
    DeleteSnapshot in SnapshotCommands (org.apache.hadoop.fs.shell)
    Find (org.apache.hadoop.fs.shell.find)
    InterruptCommand in TestFsShellReturnCode (org.apache.hadoop.fs)
    SetReplication (org.apache.hadoop.fs.shell)
    Count (org.apache.hadoop.fs.shell)
    SnapshotCommands (org.apache.hadoop.fs.shell)
    AclCommands (org.apache.hadoop.fs.shell)
    TouchCommands (org.apache.hadoop.fs.shell)
    RenameSnapshot in SnapshotCommands (org.apache.hadoop.fs.shell)
    FsUsage (org.apache.hadoop.fs.shell)
    XAttrCommands (org.apache.hadoop.fs.shell)
    Merge in CopyCommands (org.apache.hadoop.fs.shell)
    MoveToLocal in MoveCommands (org.apache.hadoop.fs.shell)
    GetfattrCommand in XAttrCommands (org.apache.hadoop.fs.shell)
    CommandWithDestination (org.apache.hadoop.fs.shell)
    Rm in Delete (org.apache.hadoop.fs.shell)
    Expunge in Delete (org.apache.hadoop.fs.shell)
    CreateSnapshot in SnapshotCommands (org.apache.hadoop.fs.shell)
    GetfaclCommand in AclCommands (org.apache.hadoop.fs.shell)
    SetfattrCommand in XAttrCommands (org.apache.hadoop.fs.shell)
    Head (org.apache.hadoop.fs.shell)
    Tail (org.apache.hadoop.fs.shell)
    Test (org.apache.hadoop.fs.shell)

咱們能夠很容易找到du對應的Command,使用IDEA的類搜索功能便可

image-20210226145210112

Du

@Override
    protected void processPath(PathData item) throws IOException {
      // 經過循環目錄的子目錄統計全部文件的大小信息進行累積
      ContentSummary contentSummary = item.fs.getContentSummary(item.path);
      long length = contentSummary.getLength();
      long spaceConsumed = contentSummary.getSpaceConsumed();
      if (excludeSnapshots) {
        length -= contentSummary.getSnapshotLength();
        spaceConsumed -= contentSummary.getSnapshotSpaceConsumed();
      }
      getUsagesTable().addRow(formatSize(length),
          formatSize(spaceConsumed), item);
    }

FileSystem

/** Return the {@link ContentSummary} of a given {@link Path}.
   * @param f path to use
   * @throws FileNotFoundException if the path does not resolve
   * @throws IOException IO failure
   */
  public ContentSummary getContentSummary(Path f) throws IOException {
    FileStatus status = getFileStatus(f);
    if (status.isFile()) {
      // f is a file
      long length = status.getLen();
      return new ContentSummary.Builder().length(length).
          fileCount(1).directoryCount(0).spaceConsumed(length).build();
    }
    // f is a directory
    long[] summary = {0, 0, 1};
    for(FileStatus s : listStatus(f)) {
      long length = s.getLen();
      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
          new ContentSummary.Builder().length(length).
          fileCount(1).directoryCount(0).spaceConsumed(length).build();
      summary[0] += c.getLength();
      summary[1] += c.getFileCount();
      summary[2] += c.getDirectoryCount();
    }
    // 實際上咱們能夠看到能拿到的目錄總個數,文件總大小,文件總個數
    return new ContentSummary.Builder().length(summary[0]).
        fileCount(summary[1]).directoryCount(summary[2]).
        spaceConsumed(summary[0]).build();
  }

hdfs dfsadmin -report

DFSAdmin

/**
   * main() has some simple utility methods.
   * @param argv Command line parameters.
   * @exception Exception if the filesystem does not exist.
   */
  public static void main(String[] argv) throws Exception {
    int res = ToolRunner.run(new DFSAdmin(), argv);// 實際執行命令對象的run方法
    System.exit(res);
  }

  /**
   * @param argv The parameters passed to this program.
   * @exception Exception if the filesystem does not exist.
   * @return 0 on success, non zero on error.
   */
  @Override
  public int run(String[] argv) {

    if (argv.length < 1) {
      printUsage("");
      return -1;
    }
    // ... 省略部分判斷代碼
    try {
      if ("-report".equals(cmd)) {
        report(argv, i);// 和明顯執行的是report方法
      } else if ("-safemode".equals(cmd)) {
          
      }
    }
    // ... 省略部分判斷代碼

   }
    
 /**
   * Gives a report on how the FileSystem is doing. fs的報告信息,也就是咱們執行命令打印信息的地方
   * @exception IOException if the filesystem does not exist.
   */
  public void report(String[] argv, int i) throws IOException {
    DistributedFileSystem dfs = getDFS();
    FsStatus ds = dfs.getStatus();
    long capacity = ds.getCapacity();
    long used = ds.getUsed();// 重點看一下已經使用的容量是怎麼統計的,這裏邏輯是必定的,因此須要查看FsStatus是如何建立的
    long remaining = ds.getRemaining();
    long bytesInFuture = dfs.getBytesWithFutureGenerationStamps();
    long presentCapacity = used + remaining;
    boolean mode = dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET);
    if (mode) {
      System.out.println("Safe mode is ON");
      if (bytesInFuture > 0) {
        System.out.println("\nWARNING: ");
        System.out.println("Name node has detected blocks with generation " +
            "stamps in future.");
        System.out.println("Forcing exit from safemode will cause " +
            bytesInFuture + " byte(s) to be deleted.");
        System.out.println("If you are sure that the NameNode was started with"
            + " the correct metadata files then you may proceed with " +
            "'-safemode forceExit'\n");
      }
    }
    System.out.println("Configured Capacity: " + capacity
                       + " (" + StringUtils.byteDesc(capacity) + ")");
    System.out.println("Present Capacity: " + presentCapacity
        + " (" + StringUtils.byteDesc(presentCapacity) + ")");
    System.out.println("DFS Remaining: " + remaining
        + " (" + StringUtils.byteDesc(remaining) + ")");
    System.out.println("DFS Used: " + used
                       + " (" + StringUtils.byteDesc(used) + ")");
    double dfsUsedPercent = 0;
    if (presentCapacity != 0) {
      dfsUsedPercent = used/(double)presentCapacity;
    }
    System.out.println("DFS Used%: "
        + StringUtils.formatPercent(dfsUsedPercent, 2));

    /* These counts are not always upto date. They are updated after  
     * iteration of an internal list. Should be updated in a few seconds to 
     * minutes. Use "-metaSave" to list of all such blocks and accurate 
     * counts.
     */
    ReplicatedBlockStats replicatedBlockStats =
        dfs.getClient().getNamenode().getReplicatedBlockStats();
    System.out.println("Replicated Blocks:");
    System.out.println("\tUnder replicated blocks: " +
        replicatedBlockStats.getLowRedundancyBlocks());
    System.out.println("\tBlocks with corrupt replicas: " +
        replicatedBlockStats.getCorruptBlocks());
    System.out.println("\tMissing blocks: " +
        replicatedBlockStats.getMissingReplicaBlocks());
    System.out.println("\tMissing blocks (with replication factor 1): " +
        replicatedBlockStats.getMissingReplicationOneBlocks());
    if (replicatedBlockStats.hasHighestPriorityLowRedundancyBlocks()) {
      System.out.println("\tLow redundancy blocks with highest priority " +
          "to recover: " +
          replicatedBlockStats.getHighestPriorityLowRedundancyBlocks());
    }
// 此處省略部分無用代碼
  }

DistributedFileSystem

// 由FileSystem.getStatus()調用
    @Override 
  public FsStatus getStatus(Path p) throws IOException {
    statistics.incrementReadOps(1);
    storageStatistics.incrementOpCounter(OpType.GET_STATUS);
    return dfs.getDiskStatus();
  }

DFSClient

/**
   * @see ClientProtocol#getStats()
   */
  public FsStatus getDiskStatus() throws IOException {
    return new FsStatus(getStateByIndex(0),
        getStateByIndex(1), getStateByIndex(2));
  }

NameNodeRpcServer

@Override // ClientProtocol
  public long[] getStats() throws IOException {
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.READ);
    return namesystem.getStats();
  }

FSNamesystem

/** @see ClientProtocol#getStats() */
  long[] getStats() {
    final long[] stats = datanodeStatistics.getStats();// 能夠看到這裏是去收集datanode上的統計信息進行的彙總
    stats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX] =
        getLowRedundancyBlocks();
    stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] =
        getCorruptReplicaBlocks();
    stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] =
        getMissingBlocksCount();
    stats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] =
        getMissingReplOneBlocksCount();
    stats[ClientProtocol.GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX] =
        blockManager.getBytesInFuture();
    stats[ClientProtocol.GET_STATS_PENDING_DELETION_BLOCKS_IDX] =
        blockManager.getPendingDeletionBlocksCount();
    return stats;
  }
相關文章
相關標籤/搜索