Hadopo提供了一個抽象的文件系統模型FileSystem,HDFS是其中的一個實現。apache
FileSystem是Hadoop中全部文件系統的抽象父類,它定義了文件系統所具備的基本特徵和基本操做。緩存
FileSystem類在org.apache.hadoop.fs包中。在eclipse中按ctrl+shift+T進行搜索,提示導入源碼包hadoop-hdfs-client-3.0.0-sources.jar。導入便可。app
1.Hadoop使用的默認的文件系統的配置項,在core-default.xml中eclipse
public static final String FS_DEFAULT_NAME_KEY =CommonConfigurationKeys.FS_DEFAULT_NAME_KEY; //該值爲fs.defultFS
2.文件系統的緩存ide
/** FileSystem cache. */ static final Cache CACHE = new Cache();
3.該文件系統在Cache中的key實例oop
/** The key this instance is stored under in the cache. */ private Cache.Key key;
4.記錄每個文件系統類的統計信息fetch
/** Recording statistics per a FileSystem class. */ private static final Map<Class<? extends FileSystem>, Statistics> statisticsTable = new IdentityHashMap<>();
5.該文件系統的統計信息大數據
/** * The statistics for this file system. */ protected Statistics statistics;
6.在文件系統關閉或者JVN退出時,須要刪除緩存中的文件ui
/** * A cache of files that should be deleted when the FileSystem is closed * or the JVM is exited. */ private final Set<Path> deleteOnExit = new TreeSet<>();
內部類 Cache:緩存文件系統對象this
1.Hadoop將文件系統對象以鍵值對的形式保存到HashMap中。key是一個Cache的靜態內部類
private final Map<Key, FileSystem> map = new HashMap<>();
2.根據文件系統的URI和配置信息獲得一個key,再獲得一個文件系統實例。若是文件系統不存在,則建立並初始化一個文件系統
FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); return getInternal(uri, conf, key); } private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{ FileSystem fs; synchronized (this) { fs = map.get(key); } if (fs != null) { return fs; } fs = createFileSystem(uri, conf); synchronized (this) { // refetch the lock again FileSystem oldfs = map.get(key); if (oldfs != null) { // a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } // now insert the new file system into the map if (map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) { ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY); } fs.key = key; map.put(key, fs); if (conf.getBoolean( FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) { toAutoClose.add(key); } return fs; } }
3.刪除指定的key對應的文件系統實例
synchronized void remove(Key key, FileSystem fs) { FileSystem cachedFs = map.remove(key); if (fs == cachedFs) { toAutoClose.remove(key); } else if (cachedFs != null) { map.put(key, cachedFs); } }
4.刪除全部的文件系統對象,並關閉這些文件系統。onlyAutomatic - 僅僅關閉這些標記爲自動關閉的。
/** * Close all FileSystem instances in the Cache. * @param onlyAutomatic only close those that are marked for automatic closing * @throws IOException a problem arose closing one or more FileSystem. */ synchronized void closeAll(boolean onlyAutomatic) throws IOException { List<IOException> exceptions = new ArrayList<>(); // Make a copy of the keys in the map since we'll be modifying // the map while iterating over it, which isn't safe. List<Key> keys = new ArrayList<>(); keys.addAll(map.keySet()); for (Key key : keys) { final FileSystem fs = map.get(key); if (onlyAutomatic && !toAutoClose.contains(key)) { continue; } //remove from cache map.remove(key); toAutoClose.remove(key); if (fs != null) { try { fs.close(); } catch(IOException ioe) { exceptions.add(ioe); } } } if (!exceptions.isEmpty()) { throw MultipleIOException.createIOException(exceptions); } }
5.根據用戶組信息關閉對應的文件系統
synchronized void closeAll(UserGroupInformation ugi) throws IOException { List<FileSystem> targetFSList = new ArrayList<>(map.entrySet().size()); //Make a pass over the list and collect the FileSystems to close //we cannot close inline since close() removes the entry from the Map for (Map.Entry<Key, FileSystem> entry : map.entrySet()) { final Key key = entry.getKey(); final FileSystem fs = entry.getValue(); if (ugi.equals(key.ugi) && fs != null) { targetFSList.add(fs); } } List<IOException> exceptions = new ArrayList<>(); //now make a pass over the target list and close each for (FileSystem fs : targetFSList) { try { fs.close(); } catch(IOException ioe) { exceptions.add(ioe); } } if (!exceptions.isEmpty()) { throw MultipleIOException.createIOException(exceptions); } }
內部類 Key :
1.成員變量
final String scheme; //模式信息 final String authority; //受權信息 final UserGroupInformation ugi; //用戶組信息
2.有兩種構造方法
Key(URI uri, Configuration conf) throws IOException { this(uri, conf, 0); } Key(URI uri, Configuration conf, long unique) throws IOException { scheme = uri.getScheme()==null ? "" : StringUtils.toLowerCase(uri.getScheme()); authority = uri.getAuthority()==null ? "" : StringUtils.toLowerCase(uri.getAuthority()); this.unique = unique; this.ugi = UserGroupInformation.getCurrentUser(); }
內部類 Statistics :文件系統的統計信息
1.成員變量
private final String scheme; //文件系統URI的模式信息 private volatile long bytesRead; //從統計信息中讀取的字節數 private volatile long bytesWritten; //向統計信息中寫入的字節數 private volatile int readOps; //執行讀操做的次數 private volatile int largeReadOps; //執行讀取大數據操做的次數 private volatile int writeOps; //執行寫操做的次數
1.獲得惟一標識文件系統的URI
/** * Returns a URI which identifies this FileSystem. * * @return the URI of this filesystem. */ public abstract URI getUri();
2.打開Path路徑指定的文件的FSDataInputStream輸入流
/** * Opens an FSDataInputStream at the indicated Path. * @param f the file name to open * @param bufferSize the size of the buffer to be used. * @throws IOException IO failure */ public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;
3.在指定的路徑上建立一個具備寫入進度的FSDataOutputStream
/** * Create an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open * @param permission file permission * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. * @param blockSize block size * @param progress the progress reporter * @throws IOException IO failure * @see #setPermission(Path, FsPermission) */ public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException;
4.在已經存在的文件後執行追加操做
/** * Append to an existing file (optional operation). * @param f the existing file to be appended. * @param bufferSize the size of the buffer to be used. * @param progress for reporting progress if it is not null. * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported * (default). */ public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException;
5.將src指定的文件重命名爲dst指定的文件
/** * Renames Path src to Path dst. * @param src path to be renamed * @param dst new path after rename * @throws IOException on failure * @return true if rename is successful */ public abstract boolean rename(Path src, Path dst) throws IOException;
6.刪除一個目錄。若是這個path是一個目錄並設置爲true,則該目錄被刪除,不然將拋出一個異常。設置爲true時會遞歸地刪除目錄。
/** Delete a file. * * @param f the path to delete. * @param recursive if path is a directory and set to * true, the directory is deleted else throws an exception. In * case of a file the recursive can be set to either true or false. * @return true if delete is successful else false. * @throws IOException IO failure */ public abstract boolean delete(Path f, boolean recursive) throws IOException;
7.列出一個目錄下面的文件和子目錄的狀態信息
/** * List the statuses of the files/directories in the given path if the path is * a directory. * <p> * Does not guarantee to return the List of files/directories status in a * sorted order. * <p> * Will not return null. Expect IOException upon access error. * @param f given path * @return the statuses of the files/directories in the given patch * @throws FileNotFoundException when the path does not exist * @throws IOException see specific implementation */ public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,IOException;
8.設置給定文件系統的當前工做目錄
/** * Set the current working directory for the given FileSystem. All relative * paths will be resolved relative to it. * * @param new_dir Path of new working directory */ public abstract void setWorkingDirectory(Path new_dir);
9.獲得給定文件系統的當前工做目錄
/** * Get the current working directory for the given FileSystem * @return the directory pathname */ public abstract Path getWorkingDirectory();
10.建立一個具備操做權限的目錄
/** * Make the given file and all non-existent parents into * directories. Has roughly the semantics of Unix @{code mkdir -p}. * Existence of the directory hierarchy is not an error. * @param f path to create * @param permission to apply to f * @throws IOException IO failure */ public abstract boolean mkdirs(Path f, FsPermission permission ) throws IOException;
11.獲得指定文件目錄的狀態信息
/** * Return a file status object that represents the path. * @param f The path we want information from * @return a FileStatus object * @throws FileNotFoundException when the path does not exist * @throws IOException see specific implementation */ public abstract FileStatus getFileStatus(Path f) throws IOException;
1.open的重載方法,IO_FILE_BUFFER_SIZE_KEY的值爲"io.file.buffer.size",IO_FILE_BUFFER_SIZE_DEFULE的值爲4096,即4KB。
/** * Opens an FSDataInputStream at the indicated Path. * @param f the file to open * @throws IOException IO failure */ public FSDataInputStream open(Path f) throws IOException { return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT)); }
2.create的重載方法
2.1默認對已經存在的文件進行重寫。
/** * Create an FSDataOutputStream at the indicated Path. * Files are overwritten by default. * @param f the file to create * @throws IOException IO failure */ public FSDataOutputStream create(Path f) throws IOException { return create(f, true); }
2.1.1根據緩衝區大小、副本數量、塊大小來建立FSDataOutputStream
/** * Create an FSDataOutputStream at the indicated Path. * @param f the file to create * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an exception will be thrown. * @throws IOException IO failure */ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { return create(f, overwrite, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), getDefaultReplication(f), getDefaultBlockSize(f)); }
默認的塊的副本數是1
/** * Get the default replication. * @return the replication; the default value is "1". * @deprecated use {@link #getDefaultReplication(Path)} instead */ @Deprecated public short getDefaultReplication() { return 1; } /** * Get the default replication for a path. * The given path will be used to locate the actual FileSystem to query. * The full path does not have to exist. * @param path of the file * @return default replication for the path's filesystem */ public short getDefaultReplication(Path path) { return getDefaultReplication(); }
默認的塊大小爲32MB
/** * Return the number of bytes that large input files should be optimally * be split into to minimize I/O time. * @deprecated use {@link #getDefaultBlockSize(Path)} instead */ @Deprecated public long getDefaultBlockSize() { // default to 32MB: large enough to minimize the impact of seeks return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024); } /** * Return the number of bytes that large input files should be optimally * be split into to minimize I/O time. The given path will be used to * locate the actual filesystem. The full path does not have to exist. * @param f path of file * @return the default block size for the path's filesystem */ public long getDefaultBlockSize(Path f) { return getDefaultBlockSize(); }
2.2在建立FSDataOutputStream的同時會向Hadoop彙報執行進度
/** * Create an FSDataOutputStream at the indicated Path with write-progress * reporting. * Files are overwritten by default. * @param f the file to create * @param progress to report progress * @throws IOException IO failure */ public FSDataOutputStream create(Path f, Progressable progress) throws IOException { return create(f, true, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), getDefaultReplication(f), getDefaultBlockSize(f), progress); }
其餘的create的重載方法大概就是使用指定或默認的緩衝區大小、塊的副本數、塊大小做爲參數來建立FSDataOutputSream,以及指定跨文件系統進行建立。這裏就不一一列舉。
3.append的重載方法
3.1根據默認的緩衝區大小打開文件,而後向文件末尾追加內容
/** * Append to an existing file (optional operation). * Same as * {@code append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, * IO_FILE_BUFFER_SIZE_DEFAULT), null)} * @param f the existing file to be appended. * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported * (default). */ public FSDataOutputStream append(Path f) throws IOException { return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), null); }
3.2根據用戶指定的緩衝區大小來打開文件,而後向文件末尾追加內容
/** * Append to an existing file (optional operation). * Same as append(f, bufferSize, null). * @param f the existing file to be appended. * @param bufferSize the size of the buffer to be used. * @throws IOException IO failure * @throws UnsupportedOperationException if the operation is unsupported * (default). */ public FSDataOutputStream append(Path f, int bufferSize) throws IOException { return append(f, bufferSize, null); }
4.mkdisr的重載方法
4.1根據默認的文件系統權限來建立目錄,默認爲00777
/** * Call {@link #mkdirs(Path, FsPermission)} with default permission. * @param f path * @return true if the directory was created * @throws IOException IO failure */ public boolean mkdirs(Path f) throws IOException { return mkdirs(f, FsPermission.getDirDefault()); }
4.2跨文件系統來執行建立目錄的操做。首先根據指定的路徑建立一個目錄,而後再將其權限設置爲用戶指定的權限
/** * Create a directory with the provided permission. * The permission of the directory is set to be the provided permission as in * setPermission, not permission&~umask * * @see #create(FileSystem, Path, FsPermission) * * @param fs FileSystem handle * @param dir the name of the directory to be created * @param permission the permission of the directory * @return true if the directory creation succeeds; false otherwise * @throws IOException A problem creating the directories. */ public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) throws IOException { // create the directory using the default permission boolean result = fs.mkdirs(dir); // set its permission to be the supplied one fs.setPermission(dir, permission); return result; }
5.listStatus的重載方法
5.1使用指定的過濾器過濾指定path的文件,而後將剩餘的文件的狀態信息保存到用戶給定的ArrayList集合中。
/** * Filter files/directories in the given path using the user-supplied path * filter. Results are added to the given array <code>results</code>. * @throws FileNotFoundException when the path does not exist * @throws IOException see specific implementation */ private void listStatus(ArrayList<FileStatus> results, Path f, PathFilter filter) throws FileNotFoundException, IOException { FileStatus listing[] = listStatus(f); Preconditions.checkNotNull(listing, "listStatus should not return NULL"); for (int i = 0; i < listing.length; i++) { if (filter.accept(listing[i].getPath())) { results.add(listing[i]); } } }
5.2和上面的區別是,保存狀態信息的集合是內部新建的,不是用戶指定的。
public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException { ArrayList<FileStatus> results = new ArrayList<>(); listStatus(results, f, filter); return results.toArray(new FileStatus[results.size()]); }
5.3使用默認的過濾器來過濾指定path集合中的文件,而後將剩餘的文件的狀態信息保存到內部新建的列表中。
/** * Filter files/directories in the given list of paths using default * path filter. * <p> * Does not guarantee to return the List of files/directories status in a * sorted order. * * @param files * a list of paths * @return a list of statuses for the files under the given paths after * applying the filter default Path filter * @throws FileNotFoundException when the path does not exist * @throws IOException see specific implementation */ public FileStatus[] listStatus(Path[] files) throws FileNotFoundException, IOException { return listStatus(files, DEFAULT_FILTER); }
5.4使用指定的過濾器來過濾指定path集合中的文件,而後將剩餘的文件的狀態信息保存到內部新建的列表中。
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws FileNotFoundException, IOException { ArrayList<FileStatus> results = new ArrayList<FileStatus>(); for (int i = 0; i < files.length; i++) { listStatus(results, files[i], filter); } return results.toArray(new FileStatus[results.size()]); }
6.copyFromLocalFile方法的重載
6.1將本地磁盤上src指定路徑的文件複製到dst指定的路徑,不刪除源文件
/** * The src file is on the local disk. Add it to filesystem at * the given dst name and the source is kept intact afterwards * @param src path * @param dst path * @throws IOException IO failure */ public void copyFromLocalFile(Path src, Path dst) throws IOException { copyFromLocalFile(false, src, dst); }
6.2根據用戶指定的delSrc的值來決定刪不刪除源文件
/** * The src file is on the local disk. Add it to the filesystem at * the given dst name. * delSrc indicates if the source should be removed * @param delSrc whether to delete the src * @param src path * @param dst path */ public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { copyFromLocalFile(delSrc, true, src, dst); }
6.3根據delSrc參數決定是否刪除源文件,根據overwrite參數決定是否覆蓋dst路徑下已有的目標文件。
/** * The src files are on the local disk. Add it to the filesystem at * the given dst name. * delSrc indicates if the source should be removed * @param delSrc whether to delete the src * @param overwrite whether to overwrite an existing file * @param srcs array of paths which are source * @param dst path * @throws IOException IO failure */ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst) throws IOException { Configuration conf = getConf(); FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf); }
7.moveFromLocalFile的重載方法,經過調用copyToLocalFile方法來實現
/** * Copy a file to the local filesystem, then delete it from the * remote filesystem (if successfully copied). * @param src path src file in the remote filesystem * @param dst path local destination * @throws IOException IO failure */ public void moveToLocalFile(Path src, Path dst) throws IOException { copyToLocalFile(true, src, dst); }
8.將遠程文件系統中src指定的文件複製到本地dst指定的路徑下,delSrc參數決定是否刪除源文件
/** * Copy it a file from a remote filesystem to the local one. * delSrc indicates if the src will be removed or not. * @param delSrc whether to delete the src * @param src path src file in the remote filesystem * @param dst path local destination * @throws IOException IO failure */ public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { copyToLocalFile(delSrc, src, dst, false); }