掌握HDFS的Java API接口訪問

     HDFS設計的主要目的是對海量數據進行存儲,也就是說在其上可以存儲很大量文件(能夠存儲TB級的文件)。HDFS將這些文件分割以後,存儲在不一樣的DataNode上, HDFS 提供了兩種訪問接口:Shell接口和Java API 接口,對HDFS裏面的文件進行操做,具體每一個Block放在哪臺DataNode上面,對於開發者來講是透明的。node

 

一、獲取文件系統windows

複製代碼
 1 /**
 2  * 獲取文件系統
 3  * 
 4  * @return FileSystem
 5  */
 6 public static FileSystem getFileSystem() {
 7     //讀取配置文件
 8     Configuration conf = new Configuration();
 9     // 文件系統
10     FileSystem fs = null;
11     
12     String hdfsUri = HDFSUri;
13     if(StringUtils.isBlank(hdfsUri)){
14         // 返回默認文件系統  若是在 Hadoop集羣下運行,使用此種方法可直接獲取默認文件系統
15         try {
16             fs = FileSystem.get(conf);
17         } catch (IOException e) {
18             logger.error("", e);
19         }
20     }else{
21         // 返回指定的文件系統,若是在本地測試,須要使用此種方法獲取文件系統
22         try {
23             URI uri = new URI(hdfsUri.trim());
24             fs = FileSystem.get(uri,conf);
25         } catch (URISyntaxException | IOException e) {
26             logger.error("", e);
27         }
28     }
29         
30     return fs;
31 }
複製代碼

 二、建立文件目錄分佈式

複製代碼
 1 /**
 2  * 建立文件目錄
 3  * 
 4  * @param path
 5  */
 6 public static void mkdir(String path) {
 7     try {
 8         // 獲取文件系統
 9         FileSystem fs = getFileSystem();
10         
11         String hdfsUri = HDFSUri;
12         if(StringUtils.isNotBlank(hdfsUri)){
13             path = hdfsUri + path;
14         }
15         
16         // 建立目錄
17         fs.mkdirs(new Path(path));
18         
19         //釋放資源
20         fs.close();
21     } catch (IllegalArgumentException | IOException e) {
22         logger.error("", e);
23     }
24 }
複製代碼

三、刪除文件或者文件目錄oop

複製代碼
 1 /**
 2  * 刪除文件或者文件目錄
 3  * 
 4  * @param path
 5  */
 6 public static void rmdir(String path) {
 7     try {
 8         // 返回FileSystem對象
 9         FileSystem fs = getFileSystem();
10         
11         String hdfsUri = HDFSUri;
12         if(StringUtils.isNotBlank(hdfsUri)){
13             path = hdfsUri + path;
14         }
15         
16         // 刪除文件或者文件目錄  delete(Path f) 此方法已經棄用
17         fs.delete(new Path(path),true);
18         
19         // 釋放資源
20         fs.close();
21     } catch (IllegalArgumentException | IOException e) {
22         logger.error("", e);
23     }
24 }
複製代碼

三、根據filter獲取目錄下的文件測試

複製代碼
 1 /**
 2  * 根據filter獲取目錄下的文件
 3  * 
 4  * @param path
 5  * @param pathFilter
 6  * @return String[]
 7  */
 8 public static String[] ListFile(String path,PathFilter pathFilter) {
 9     String[] files = new String[0];
10     
11     try {
12         // 返回FileSystem對象
13         FileSystem fs = getFileSystem();
14         
15         String hdfsUri = HDFSUri;
16         if(StringUtils.isNotBlank(hdfsUri)){
17             path = hdfsUri + path;
18         }
19         
20         FileStatus[] status;
21         if(pathFilter != null){
22             // 根據filter列出目錄內容
23             status = fs.listStatus(new Path(path),pathFilter);
24         }else{
25             // 列出目錄內容
26             status = fs.listStatus(new Path(path));
27         }
28         
29         // 獲取目錄下的全部文件路徑
30         Path[] listedPaths = FileUtil.stat2Paths(status);
31         // 轉換String[]
32         if (listedPaths != null && listedPaths.length > 0){
33             files = new String[listedPaths.length];
34             for (int i = 0; i < files.length; i++){
35                 files[i] = listedPaths[i].toString();
36             }
37         }
38         // 釋放資源
39         fs.close();
40     } catch (IllegalArgumentException | IOException e) {
41         logger.error("", e);
42     }
43     
44     return files;
45 }
複製代碼

四、文件上傳至 HDFSspa

複製代碼
 1 /**
 2  * 文件上傳至 HDFS
 3  * 
 4  * @param delSrc
 5  * @param overwrite
 6  * @param srcFile
 7  * @param destPath
 8  */
 9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {
10     // 源文件路徑是Linux下的路徑,若是在 windows 下測試,須要改寫爲Windows下的路徑,好比D://hadoop/djt/weibo.txt
11     Path srcPath = new Path(srcFile);
12     
13     // 目的路徑
14     String hdfsUri = HDFSUri;
15     if(StringUtils.isNotBlank(hdfsUri)){
16         destPath = hdfsUri + destPath;
17     }
18     Path dstPath = new Path(destPath);
19     
20     // 實現文件上傳
21     try {
22         // 獲取FileSystem對象
23         FileSystem fs = getFileSystem();
24         fs.copyFromLocalFile(srcPath, dstPath);
25         fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
26         //釋放資源
27         fs.close();
28     } catch (IOException e) {
29         logger.error("", e);
30     }
31 }
複製代碼

五、從 HDFS 下載文件設計

複製代碼
 1 /**
 2  * 從 HDFS 下載文件
 3  * 
 4  * @param srcFile
 5  * @param destPath
 6  */
 7 public static void getFile(String srcFile,String destPath) {
 8     // 源文件路徑
 9     String hdfsUri = HDFSUri;
10     if(StringUtils.isNotBlank(hdfsUri)){
11         srcFile = hdfsUri + srcFile;
12     }
13     Path srcPath = new Path(srcFile);
14     
15     // 目的路徑是Linux下的路徑,若是在 windows 下測試,須要改寫爲Windows下的路徑,好比D://hadoop/djt/
16     Path dstPath = new Path(destPath);
17     
18     try {
19         // 獲取FileSystem對象
20         FileSystem fs = getFileSystem();
21         // 下載hdfs上的文件
22         fs.copyToLocalFile(srcPath, dstPath);
23         // 釋放資源
24         fs.close();
25     } catch (IOException e) {
26         logger.error("", e);
27     }
28 }
複製代碼

六、獲取 HDFS 集羣節點信息code

複製代碼
 1 /**
 2  * 獲取 HDFS 集羣節點信息
 3  * 
 4  * @return DatanodeInfo[]
 5  */
 6 public static DatanodeInfo[] getHDFSNodes() {
 7     // 獲取全部節點
 8     DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];
 9     
10     try {
11         // 返回FileSystem對象
12         FileSystem fs = getFileSystem();
13         
14         // 獲取分佈式文件系統
15         DistributedFileSystem hdfs = (DistributedFileSystem)fs;
16         
17         dataNodeStats = hdfs.getDataNodeStats();
18     } catch (IOException e) {
19         logger.error("", e);
20     }
21     return dataNodeStats;
22 }
複製代碼

七、查找某個文件在 HDFS集羣的位置對象

複製代碼
 1 /**
 2  * 查找某個文件在 HDFS集羣的位置
 3  * 
 4  * @param filePath
 5  * @return BlockLocation[]
 6  */
 7 public static BlockLocation[] getFileBlockLocations(String filePath) {
 8     // 文件路徑
 9     String hdfsUri = HDFSUri;
10     if(StringUtils.isNotBlank(hdfsUri)){
11         filePath = hdfsUri + filePath;
12     }
13     Path path = new Path(filePath);
14     
15     // 文件塊位置列表
16     BlockLocation[] blkLocations = new BlockLocation[0];
17     try {
18         // 返回FileSystem對象
19         FileSystem fs = getFileSystem();
20         // 獲取文件目錄 
21         FileStatus filestatus = fs.getFileStatus(path);
22         //獲取文件塊位置列表
23         blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
24     } catch (IOException e) {
25         logger.error("", e);
26     }
27     return blkLocations;
28 }
複製代碼

 八、文件重命名blog

複製代碼
 1 /**
 2  * 文件重命名
 3  * 
 4  * @param srcPath
 5  * @param dstPath
 6  */
 7 public boolean rename(String srcPath, String dstPath){
 8     boolean flag = false;
 9     try    {
10         // 返回FileSystem對象
11         FileSystem fs = getFileSystem();
12         
13         String hdfsUri = HDFSUri;
14         if(StringUtils.isNotBlank(hdfsUri)){
15             srcPath = hdfsUri + srcPath;
16             dstPath = hdfsUri + dstPath;
17         }
18         
19         flag = fs.rename(new Path(srcPath), new Path(dstPath));
20     } catch (IOException e) {
21         logger.error("{} rename to {} error.", srcPath, dstPath);
22     }
23     
24     return flag;
25 }
複製代碼

九、判斷目錄是否存在

複製代碼
 1 /**
 2  * 判斷目錄是否存在
 3  * 
 4  * @param srcPath
 5  * @param dstPath
 6  */
 7 public boolean existDir(String filePath, boolean create){
 8     boolean flag = false;
 9     
10     if (StringUtils.isEmpty(filePath)){
11         return flag;
12     }
13     
14     try{
15         Path path = new Path(filePath);
16         // FileSystem對象
17         FileSystem fs = getFileSystem();
18         
19         if (create){
20             if (!fs.exists(path)){
21                 fs.mkdirs(path);
22             }
23         }
24         
25         if (fs.isDirectory(path)){
26             flag = true;
27         }
28     }catch (Exception e){
29         logger.error("", e);
30     }
31     
32     return flag;
33 }
複製代碼

     10  查看HDFS文件的最後修改時間  

  1. public void testgetModifyTime() throws Exception {  
  2.         Configuration conf = new Configuration();  
  3.         FileSystem hdfs = FileSystem.get(conf);  
  4.         Path dst = new Path(hdfsPath);  
  5.         FileStatus files[] = hdfs.listStatus(dst);  
  6. for (FileStatus file : files) {  
  7.             System.out.println(file.getPath() + "\t"  
  8.                     + file.getModificationTime());  
  9.             System.out.println(file.getPath() + "\t"  
  10.                     + new Date(file.getModificationTime()));  
  11.         } 

   

  1.    // 查看HDFS文件是否存在  
  2.   
  3.     public void testExists() throws Exception {  
  4.   
  5.         Configuration conf = new Configuration();  
  6.           
  7.         FileSystem hdfs = FileSystem.get(conf);  
  8.         Path dst = new Path(hdfsPath + "file01.txt");  
  9.   
  10.         boolean ok = hdfs.exists(dst);  
  11.         System.out.println(ok ? "文件存在" : "文件不存在");  
  12.     }  

    

  1.     // 獲取HDFS集羣上全部節點名稱  
  2.     public void testGetHostName() throws Exception {  
  3.   
  4.         Configuration conf = new Configuration();  
  5.           
  6.         DistributedFileSystem hdfs = (DistributedFileSystem) FileSystem  
  7.                 .get(conf);  
  8.         DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();  
  9.   
  10.         for (DatanodeInfo dataNode : dataNodeStats) {  
  11.             System.out.println(dataNode.getHostName() + "\t"  
  12.                     + dataNode.getName());  
  13.         }  
  14.     }
相關文章
相關標籤/搜索