Hadoop源碼分析之FileSystem抽象文件系統

Hadopo提供了一個抽象的文件系統模型FileSystem,HDFS是其中的一個實現。apache

FileSystem是Hadoop中全部文件系統的抽象父類,它定義了文件系統所具備的基本特徵和基本操做。緩存

FileSystem類在org.apache.hadoop.fs包中。在eclipse中按ctrl+shift+T進行搜索,提示導入源碼包hadoop-hdfs-client-3.0.0-sources.jar。導入便可。app

1、成員變量

  1.Hadoop使用的默認的文件系統的配置項,在core-default.xml中eclipse

public static final String FS_DEFAULT_NAME_KEY =CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;  //該值爲fs.defultFS

  2.文件系統的緩存ide

/** FileSystem cache. */
  static final Cache CACHE = new Cache();

  3.該文件系統在Cache中的key實例oop

  /** The key this instance is stored under in the cache. */
  private Cache.Key key;

  4.記錄每個文件系統類的統計信息fetch

  /** Recording statistics per a FileSystem class. */
  private static final Map<Class<? extends FileSystem>, Statistics>
      statisticsTable = new IdentityHashMap<>();

  5.該文件系統的統計信息大數據

  /**
   * The statistics for this file system.
   */
  protected Statistics statistics;

   6.在文件系統關閉或者JVN退出時,須要刪除緩存中的文件ui

  /**
   * A cache of files that should be deleted when the FileSystem is closed
   * or the JVM is exited.
   */
  private final Set<Path> deleteOnExit = new TreeSet<>();

 2、內部類

內部類 Cache:緩存文件系統對象this

  1.Hadoop將文件系統對象以鍵值對的形式保存到HashMap中。key是一個Cache的靜態內部類

    private final Map<Key, FileSystem> map = new HashMap<>();

   2.根據文件系統的URI和配置信息獲得一個key,再獲得一個文件系統實例。若是文件系統不存在,則建立並初始化一個文件系統

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
    }

    private FileSystem getInternal(URI uri, Configuration conf, Key key)
        throws IOException{
      FileSystem fs;
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }

      fs = createFileSystem(uri, conf);
      synchronized (this) { // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty()
                && !ShutdownHookManager.get().isShutdownInProgress()) {
          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
        }
        fs.key = key;
        map.put(key, fs);
        if (conf.getBoolean(
            FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
          toAutoClose.add(key);
        }
        return fs;
      }
    }

   3.刪除指定的key對應的文件系統實例

    synchronized void remove(Key key, FileSystem fs) {
      FileSystem cachedFs = map.remove(key);
      if (fs == cachedFs) {
        toAutoClose.remove(key);
      } else if (cachedFs != null) {
        map.put(key, cachedFs);
      }
    }

  4.刪除全部的文件系統對象,並關閉這些文件系統。onlyAutomatic - 僅僅關閉這些標記爲自動關閉的。

    /**
     * Close all FileSystem instances in the Cache.
     * @param onlyAutomatic only close those that are marked for automatic closing
     * @throws IOException a problem arose closing one or more FileSystem.
     */
    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
      List<IOException> exceptions = new ArrayList<>();

      // Make a copy of the keys in the map since we'll be modifying
      // the map while iterating over it, which isn't safe.
      List<Key> keys = new ArrayList<>();
      keys.addAll(map.keySet());

      for (Key key : keys) {
        final FileSystem fs = map.get(key);

        if (onlyAutomatic && !toAutoClose.contains(key)) {
          continue;
        }

        //remove from cache
        map.remove(key);
        toAutoClose.remove(key);

        if (fs != null) {
          try {
            fs.close();
          }
          catch(IOException ioe) {
            exceptions.add(ioe);
          }
        }
      }

      if (!exceptions.isEmpty()) {
        throw MultipleIOException.createIOException(exceptions);
      }
    }

   5.根據用戶組信息關閉對應的文件系統

    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
      List<FileSystem> targetFSList = new ArrayList<>(map.entrySet().size());
      //Make a pass over the list and collect the FileSystems to close
      //we cannot close inline since close() removes the entry from the Map
      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
        final Key key = entry.getKey();
        final FileSystem fs = entry.getValue();
        if (ugi.equals(key.ugi) && fs != null) {
          targetFSList.add(fs);
        }
      }
      List<IOException> exceptions = new ArrayList<>();
      //now make a pass over the target list and close each
      for (FileSystem fs : targetFSList) {
        try {
          fs.close();
        }
        catch(IOException ioe) {
          exceptions.add(ioe);
        }
      }
      if (!exceptions.isEmpty()) {
        throw MultipleIOException.createIOException(exceptions);
      }
    }

 

 內部類 Key :

  1.成員變量

      final String scheme;    //模式信息    
      final String authority;    //受權信息
      final UserGroupInformation ugi;    //用戶組信息

  2.有兩種構造方法

      Key(URI uri, Configuration conf) throws IOException {
        this(uri, conf, 0);
      }

      Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null ?
            "" : StringUtils.toLowerCase(uri.getScheme());
        authority = uri.getAuthority()==null ?
            "" : StringUtils.toLowerCase(uri.getAuthority());
        this.unique = unique;

        this.ugi = UserGroupInformation.getCurrentUser();
      }

 

 內部類 Statistics :文件系統的統計信息

   1.成員變量

      private final String scheme;    //文件系統URI的模式信息
      private volatile long bytesRead;     //從統計信息中讀取的字節數
      private volatile long bytesWritten;    //向統計信息中寫入的字節數
      private volatile int readOps;     //執行讀操做的次數
      private volatile int largeReadOps;   //執行讀取大數據操做的次數
      private volatile int writeOps;    //執行寫操做的次數

 3、成員方法

抽象方法以下:

   1.獲得惟一標識文件系統的URI

  /**
   * Returns a URI which identifies this FileSystem.
   *
   * @return the URI of this filesystem.
   */
  public abstract URI getUri();

  2.打開Path路徑指定的文件的FSDataInputStream輸入流

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file name to open
   * @param bufferSize the size of the buffer to be used.
   * @throws IOException IO failure
   */
  public abstract FSDataInputStream open(Path f, int bufferSize)
    throws IOException;

  3.在指定的路徑上建立一個具備寫入進度的FSDataOutputStream

  /**
   * Create an FSDataOutputStream at the indicated Path with write-progress
   * reporting.
   * @param f the file name to open
   * @param permission file permission
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an error will be thrown.
   * @param bufferSize the size of the buffer to be used.
   * @param replication required block replication for the file.
   * @param blockSize block size
   * @param progress the progress reporter
   * @throws IOException IO failure
   * @see #setPermission(Path, FsPermission)
   */
  public abstract FSDataOutputStream create(Path f,
      FsPermission permission,
      boolean overwrite,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException;

  4.在已經存在的文件後執行追加操做

  /**
   * Append to an existing file (optional operation).
   * @param f the existing file to be appended.
   * @param bufferSize the size of the buffer to be used.
   * @param progress for reporting progress if it is not null.
   * @throws IOException IO failure
   * @throws UnsupportedOperationException if the operation is unsupported
   *         (default).
   */
  public abstract FSDataOutputStream append(Path f, int bufferSize,
      Progressable progress) throws IOException;

  5.將src指定的文件重命名爲dst指定的文件

  /**
   * Renames Path src to Path dst.
   * @param src path to be renamed
   * @param dst new path after rename
   * @throws IOException on failure
   * @return true if rename is successful
   */
  public abstract boolean rename(Path src, Path dst) throws IOException;

  6.刪除一個目錄。若是這個path是一個目錄並設置爲true,則該目錄被刪除,不然將拋出一個異常。設置爲true時會遞歸地刪除目錄。

  /** Delete a file.
   *
   * @param f the path to delete.
   * @param recursive if path is a directory and set to
   * true, the directory is deleted else throws an exception. In
   * case of a file the recursive can be set to either true or false.
   * @return  true if delete is successful else false.
   * @throws IOException IO failure
   */
  public abstract boolean delete(Path f, boolean recursive) throws IOException;

  7.列出一個目錄下面的文件和子目錄的狀態信息

  /**
   * List the statuses of the files/directories in the given path if the path is
   * a directory.
   * <p>
   * Does not guarantee to return the List of files/directories status in a
   * sorted order.
   * <p>
   * Will not return null. Expect IOException upon access error.
   * @param f given path
   * @return the statuses of the files/directories in the given patch
   * @throws FileNotFoundException when the path does not exist
   * @throws IOException see specific implementation
   */
  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,IOException;

  8.設置給定文件系統的當前工做目錄

  /**
   * Set the current working directory for the given FileSystem. All relative
   * paths will be resolved relative to it.
   *
   * @param new_dir Path of new working directory
   */
  public abstract void setWorkingDirectory(Path new_dir);

  9.獲得給定文件系統的當前工做目錄

  /**
   * Get the current working directory for the given FileSystem
   * @return the directory pathname
   */
  public abstract Path getWorkingDirectory();

  10.建立一個具備操做權限的目錄

  /**
   * Make the given file and all non-existent parents into
   * directories. Has roughly the semantics of Unix @{code mkdir -p}.
   * Existence of the directory hierarchy is not an error.
   * @param f path to create
   * @param permission to apply to f
   * @throws IOException IO failure
   */
  public abstract boolean mkdirs(Path f, FsPermission permission
      ) throws IOException;

  11.獲得指定文件目錄的狀態信息

  /**
   * Return a file status object that represents the path.
   * @param f The path we want information from
   * @return a FileStatus object
   * @throws FileNotFoundException when the path does not exist
   * @throws IOException see specific implementation
   */
  public abstract FileStatus getFileStatus(Path f) throws IOException;

以上是FileSystem的一些抽象方法。

如下是FileSystem對抽象方法的一些重載方法。

  1.open的重載方法,IO_FILE_BUFFER_SIZE_KEY的值爲"io.file.buffer.size",IO_FILE_BUFFER_SIZE_DEFULE的值爲4096,即4KB。

 

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   * @throws IOException IO failure
   */
  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
        IO_FILE_BUFFER_SIZE_DEFAULT));
  }

   2.create的重載方法

    2.1默認對已經存在的文件進行重寫。

  /**
   * Create an FSDataOutputStream at the indicated Path.
   * Files are overwritten by default.
   * @param f the file to create
   * @throws IOException IO failure
   */
  public FSDataOutputStream create(Path f) throws IOException {
    return create(f, true);
  }

       2.1.1根據緩衝區大小、副本數量、塊大小來建立FSDataOutputStream

  /**
   * Create an FSDataOutputStream at the indicated Path.
   * @param f the file to create
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an exception will be thrown.
   * @throws IOException IO failure
   */
  public FSDataOutputStream create(Path f, boolean overwrite)
      throws IOException {
    return create(f, overwrite,
                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
                      IO_FILE_BUFFER_SIZE_DEFAULT),
                  getDefaultReplication(f),
                  getDefaultBlockSize(f));
  }

  默認的塊的副本數是1

  /**
   * Get the default replication.
   * @return the replication; the default value is "1".
   * @deprecated use {@link #getDefaultReplication(Path)} instead
   */
  @Deprecated
  public short getDefaultReplication() { return 1; }

  /**
   * Get the default replication for a path.
   * The given path will be used to locate the actual FileSystem to query.
   * The full path does not have to exist.
   * @param path of the file
   * @return default replication for the path's filesystem
   */
  public short getDefaultReplication(Path path) {
    return getDefaultReplication();
  }

   默認的塊大小爲32MB

  /**
   * Return the number of bytes that large input files should be optimally
   * be split into to minimize I/O time.
   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
   */
  @Deprecated
  public long getDefaultBlockSize() {
    // default to 32MB: large enough to minimize the impact of seeks
    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
  }

  /**
   * Return the number of bytes that large input files should be optimally
   * be split into to minimize I/O time.  The given path will be used to
   * locate the actual filesystem.  The full path does not have to exist.
   * @param f path of file
   * @return the default block size for the path's filesystem
   */
  public long getDefaultBlockSize(Path f) {
    return getDefaultBlockSize();
  }

   2.2在建立FSDataOutputStream的同時會向Hadoop彙報執行進度

  /**
   * Create an FSDataOutputStream at the indicated Path with write-progress
   * reporting.
   * Files are overwritten by default.
   * @param f the file to create
   * @param progress to report progress
   * @throws IOException IO failure 
   */
  public FSDataOutputStream create(Path f, Progressable progress)
      throws IOException {
    return create(f, true,
                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
                      IO_FILE_BUFFER_SIZE_DEFAULT),
                  getDefaultReplication(f),
                  getDefaultBlockSize(f), progress);
  }

 

   其餘的create的重載方法大概就是使用指定或默認的緩衝區大小、塊的副本數、塊大小做爲參數來建立FSDataOutputSream,以及指定跨文件系統進行建立。這裏就不一一列舉。

   3.append的重載方法

    3.1根據默認的緩衝區大小打開文件,而後向文件末尾追加內容

  /**
   * Append to an existing file (optional operation).
   * Same as
   * {@code append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
   *     IO_FILE_BUFFER_SIZE_DEFAULT), null)}
   * @param f the existing file to be appended.
   * @throws IOException IO failure
   * @throws UnsupportedOperationException if the operation is unsupported
   *         (default).
   */
  public FSDataOutputStream append(Path f) throws IOException {
    return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
        IO_FILE_BUFFER_SIZE_DEFAULT), null);
  }

     3.2根據用戶指定的緩衝區大小來打開文件,而後向文件末尾追加內容

  /**
   * Append to an existing file (optional operation).
   * Same as append(f, bufferSize, null).
   * @param f the existing file to be appended.
   * @param bufferSize the size of the buffer to be used.
   * @throws IOException IO failure
   * @throws UnsupportedOperationException if the operation is unsupported
   *         (default).
   */
  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
    return append(f, bufferSize, null);
  }

 

  4.mkdisr的重載方法

    4.1根據默認的文件系統權限來建立目錄,默認爲00777

  /**
   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
   * @param f path
   * @return true if the directory was created
   * @throws IOException IO failure
   */
  public boolean mkdirs(Path f) throws IOException {
    return mkdirs(f, FsPermission.getDirDefault());
  }

    4.2跨文件系統來執行建立目錄的操做。首先根據指定的路徑建立一個目錄,而後再將其權限設置爲用戶指定的權限

  /**
   * Create a directory with the provided permission.
   * The permission of the directory is set to be the provided permission as in
   * setPermission, not permission&~umask
   *
   * @see #create(FileSystem, Path, FsPermission)
   *
   * @param fs FileSystem handle
   * @param dir the name of the directory to be created
   * @param permission the permission of the directory
   * @return true if the directory creation succeeds; false otherwise
   * @throws IOException A problem creating the directories.
   */
  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
      throws IOException {
    // create the directory using the default permission
    boolean result = fs.mkdirs(dir);
    // set its permission to be the supplied one
    fs.setPermission(dir, permission);
    return result;
  }

    5.listStatus的重載方法

      5.1使用指定的過濾器過濾指定path的文件,而後將剩餘的文件的狀態信息保存到用戶給定的ArrayList集合中。

  /**
   * Filter files/directories in the given path using the user-supplied path
   * filter. Results are added to the given array <code>results</code>.
   * @throws FileNotFoundException when the path does not exist
   * @throws IOException see specific implementation
   */
  private void listStatus(ArrayList<FileStatus> results, Path f,
      PathFilter filter) throws FileNotFoundException, IOException {
    FileStatus listing[] = listStatus(f);
    Preconditions.checkNotNull(listing, "listStatus should not return NULL");
    for (int i = 0; i < listing.length; i++) {
      if (filter.accept(listing[i].getPath())) {
        results.add(listing[i]);
      }
    }
  }

      5.2和上面的區別是,保存狀態信息的集合是內部新建的,不是用戶指定的。

  public FileStatus[] listStatus(Path f, PathFilter filter)
                                   throws FileNotFoundException, IOException {
    ArrayList<FileStatus> results = new ArrayList<>();
    listStatus(results, f, filter);
    return results.toArray(new FileStatus[results.size()]);
  }

        5.3使用默認的過濾器來過濾指定path集合中的文件,而後將剩餘的文件的狀態信息保存到內部新建的列表中。

  /**
   * Filter files/directories in the given list of paths using default
   * path filter.
   * <p>
   * Does not guarantee to return the List of files/directories status in a
   * sorted order.
   *
   * @param files
   *          a list of paths
   * @return a list of statuses for the files under the given paths after
   *         applying the filter default Path filter
   * @throws FileNotFoundException when the path does not exist
   * @throws IOException see specific implementation
   */
  public FileStatus[] listStatus(Path[] files)
      throws FileNotFoundException, IOException {
    return listStatus(files, DEFAULT_FILTER);
  }

     5.4使用指定的過濾器來過濾指定path集合中的文件,而後將剩餘的文件的狀態信息保存到內部新建的列表中。

  public FileStatus[] listStatus(Path[] files, PathFilter filter)
      throws FileNotFoundException, IOException {
    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
    for (int i = 0; i < files.length; i++) {
      listStatus(results, files[i], filter);
    }
    return results.toArray(new FileStatus[results.size()]);
  }

  6.copyFromLocalFile方法的重載

    6.1將本地磁盤上src指定路徑的文件複製到dst指定的路徑,不刪除源文件

  /**
   * The src file is on the local disk.  Add it to filesystem at
   * the given dst name and the source is kept intact afterwards
   * @param src path
   * @param dst path
   * @throws IOException IO failure
   */
  public void copyFromLocalFile(Path src, Path dst)
    throws IOException {
    copyFromLocalFile(false, src, dst);
  }

    6.2根據用戶指定的delSrc的值來決定刪不刪除源文件

  /**
   * The src file is on the local disk.  Add it to the filesystem at
   * the given dst name.
   * delSrc indicates if the source should be removed
   * @param delSrc whether to delete the src
   * @param src path
   * @param dst path
   */
  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
    throws IOException {
    copyFromLocalFile(delSrc, true, src, dst);
  }

    6.3根據delSrc參數決定是否刪除源文件,根據overwrite參數決定是否覆蓋dst路徑下已有的目標文件。

  /**
   * The src files are on the local disk.  Add it to the filesystem at
   * the given dst name.
   * delSrc indicates if the source should be removed
   * @param delSrc whether to delete the src
   * @param overwrite whether to overwrite an existing file
   * @param srcs array of paths which are source
   * @param dst path
   * @throws IOException IO failure
   */
  public void copyFromLocalFile(boolean delSrc, boolean overwrite,
                                Path[] srcs, Path dst)
    throws IOException {
    Configuration conf = getConf();
    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
  }

  7.moveFromLocalFile的重載方法,經過調用copyToLocalFile方法來實現

  /**
   * Copy a file to the local filesystem, then delete it from the
   * remote filesystem (if successfully copied).
   * @param src path src file in the remote filesystem
   * @param dst path local destination
   * @throws IOException IO failure
   */
  public void moveToLocalFile(Path src, Path dst) throws IOException {
    copyToLocalFile(true, src, dst);
  }

  8.將遠程文件系統中src指定的文件複製到本地dst指定的路徑下,delSrc參數決定是否刪除源文件

  /**
   * Copy it a file from a remote filesystem to the local one.
   * delSrc indicates if the src will be removed or not.
   * @param delSrc whether to delete the src
   * @param src path src file in the remote filesystem
   * @param dst path local destination
   * @throws IOException IO failure
   */
  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
    throws IOException {
    copyToLocalFile(delSrc, src, dst, false);
  }
相關文章
相關標籤/搜索