HDFS初探之旅(二)

 

六、HDFS API詳解                                              html

  Hadoop中關於文件操做類疾病上所有在「org.apache.hadoop.fs」包中,這些API可以支持的操做包含:打開文件、讀寫文件、刪除文件等。node

  Hadoop類庫中最終面向用戶提供的接口類FileSystem,該類是個抽象類,只能經過該類的get方法得當具體的類。get方法存在幾個重載版本,經常使用的是這個:apache

  

  該類幾乎封裝了全部的文件操做,例如mkdir、delete等。綜上基本上能夠得出操做文件的程序庫框架:瀏覽器

   

  

6.1 上傳本地文件緩存

  經過「FileSystem.copyFromLocalFile(Path src, Path dst)」可將本地文件上傳到HDFS的指定位置上,其中src和dst均爲文件的完成路徑。具體事例以下:框架

package com.hebut.file;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

public class CopyFile {

    public static void main(String[] args) throws Exception {

        Configuration conf=new Configuration();

        FileSystem hdfs=FileSystem.get(conf);

       
        //本地文件

        Path src =new Path("D:\\HebutWinOS");

        //HDFS爲止

        Path dst =new Path("/");


        hdfs.copyFromLocalFile(src, dst);

        System.out.println("Upload to"+conf.get("fs.default.name"));

       
        FileStatus files[]=hdfs.listStatus(dst);

        for(FileStatus file:files){

            System.out.println(file.getPath());

        }

    }

}

  運行結果能夠經過控制檯、項目瀏覽器和SecureCRT查看,以下圖所示。函數

  1)控制檯結果oop

  

  2)項目瀏覽器spa

  

  3)SecureCRT結果3d

  

  

6.2 建立HDFS文件

  經過「FileSystem.create(Path f)"可在HDFS上建立文件,其中f爲文件的完整路徑。具體實現以下:

package com.hebut.file;

 
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;


public class CreateFile {

    public static void main(String[] args) throws Exception {

        Configuration conf=new Configuration();

        FileSystem hdfs=FileSystem.get(conf);

        byte[] buff="hello hadoop world!\n".getBytes();

        Path dfs=new Path("/test");

        FSDataOutputStream outputStream=hdfs.create(dfs);

        outputStream.write(buff,0,buff.length);
    }

}

  運行結果,以下圖所示。

  1)項目瀏覽器

  

  2)SecureCRT結果

  

 

6.3 建立HDFS目錄

  經過」FileSystem.mkdirs(Path f)「可在HDFS上建立文件夾,其中f爲文件夾的完整路徑。具體實現以下所示。

package com.hebut.dir;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class CreateDir { public static void main(String[] args) throws Exception{ Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); Path dfs=new Path("/TestDir"); hdfs.mkdirs(dfs); } }

  運行結果,以下圖所示。

  1)項目瀏覽器

  

  2)SecureCRT結果

  

 

6.4 重命名HDFS文件

  經過」FileSystem.rename(Path src, Path dst)「可爲指定的HDFS文件重命名,其中src和dst均爲文件的完整路徑,具體實現以下所示。

package com.hebut.file;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

public class Rename{

    public static void main(String[] args) throws Exception {

        Configuration conf=new Configuration();

        FileSystem hdfs=FileSystem.get(conf);

        Path frpaht=new Path("/test");    //舊的文件名

        Path topath=new Path("/test1");    //新的文件名

        boolean isRename=hdfs.rename(frpaht, topath);

        String result=isRename?"成功":"失敗";

        System.out.println("文件重命名結果爲:"+result);
    }
}

  運行結果,以下圖所示。

  1)項目瀏覽器

  

  2)SecureCRT結果

  

 

6.5 刪除HDFS上的文件

  經過」FileSystem.delete(Path f, Boolean recursive)「可刪除指定的HDFS文件,其中f爲須要刪除的完整路徑,recuresive用來肯定是否進行遞歸刪除。具體實現以下:

package com.hebut.file;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class DeleteFile { public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf);
Path delef
=new Path("/test1"); boolean isDeleted=hdfs.delete(delef,false); //遞歸刪除 //boolean isDeleted=hdfs.delete(delef,true); System.out.println("Delete?"+isDeleted); } }

  運行結果,以下圖所示。

  1)控制檯結果

  

  2)項目瀏覽器

  

  

6.6 刪除HDFS上的目錄

  同刪除文件代碼同樣,只是緩存刪除目錄路徑便可,若是目錄下有文件,要進行遞歸刪除。

 

6.7 查看某個HDFS文件是否存在

  經過」FileSystem.exists(Path f)「可查看指定HDFS文件是否存在,其中f爲文件的完整路徑。具體事項以下:

package com.hebut.file;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class CheckFile { public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); Path findf=new Path("/test1"); boolean isExists=hdfs.exists(findf); System.out.println("Exist?"+isExists); } }

  運行結果,以下圖所示。

  1)控制檯結果

  

  2)項目瀏覽器

  

 

6.8 查看HDFS文件的最後修改時間

  經過」FileSystem.getModificationTime()「可查看指定HDFS文件的修改時間。具體實現以下:

package com.hebut.file;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

public class GetLTime {

    public static void main(String[] args) throws Exception {

        Configuration conf=new Configuration();

        FileSystem hdfs=FileSystem.get(conf);

        Path fpath =new Path("/user/hadoop/test/file1.txt");

        FileStatus fileStatus=hdfs.getFileStatus(fpath);

        long modiTime=fileStatus.getModificationTime();

        System.out.println("file1.txt的修改時間是"+modiTime);
    }
}

  運行結果,以下圖所示。

  1)控制檯結果

  

  

6.9 讀取HDFS某個目錄下的全部文件

  經過」FileStatus.getPath()「能夠查看指定HDFS中某個目錄下全部文件。具體實現以下所示。

package com.hebut.file;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

public class ListAllFile {

    public static void main(String[] args) throws Exception {

        Configuration conf=new Configuration();

        FileSystem hdfs=FileSystem.get(conf);

        Path listf =new Path("/user/hadoop/test");      

        FileStatus stats[]=hdfs.listStatus(listf);

        for(int i = 0; i < stats.length; ++i)
     {
       System.out.println(stats[i].getPath().toString());
     }
        hdfs.close();
    }
}

  運行結果,以下圖所示。

  1)控制檯結果

  

  2)項目瀏覽器

  

 

6.10 查找某個文件在HDFS集羣的位置

  經過」FileSystem.getFileBlockLocation(FileStatus file, long start, long len)「可查找指定文件在HDFS集羣上的位置,其中file爲文件的完整路徑,start和len來標識查找文件的路徑。具體實現以下:

package com.hebut.file;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class FileLoc { public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); Path fpath=new Path("/user/hadoop/cygwin"); FileStatus filestatus = hdfs.getFileStatus(fpath); BlockLocation[] blkLocations = hdfs.getFileBlockLocations(filestatus, 0, filestatus.getLen()); int blockLen = blkLocations.length; for(int i=0;i<blockLen;i++){ String[] hosts = blkLocations[i].getHosts(); System.out.println("block_"+i+"_location:"+hosts[0]); } } }

  運行結果,以下圖所示。

  1)控制檯結果

 

  

6.11 獲取HDFS集羣上全部節點名稱信息

  經過」DatanodeInfo.getHostName()「可獲取HDFS集羣上的全部節點名稱。具體實現以下:

package com.hebut.file;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.hdfs.DistributedFileSystem;

import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

public class GetList {

    public static void main(String[] args) throws Exception {

        Configuration conf=new Configuration();

        FileSystem fs=FileSystem.get(conf);

        DistributedFileSystem hdfs = (DistributedFileSystem)fs;

        DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

        for(int i=0;i<dataNodeStats.length;i++){

            System.out.println("DataNode_"+i+"_Name:"+dataNodeStats[i].getHostName());
        }
    }
}

  運行結果,以下圖所示。

  1)控制檯結果

  

 

七、HDFS的讀寫數據流                                          

7.1 文件的讀取剖析

  

  文件讀取的過程以下:

  1)解釋一

  • 客戶端(client)用FileSystem的open()函數打開文件;
  • DistributedFileSystem用RPC調用元數據節點,獲得文件的數據塊信息;
  • 對於每個數據塊,元數據節點返回保存保存數據塊的數據節點的地址;
  • DistributedFileSystem返回FSDataInputStream給客戶端,用來讀取數據;
  • 客戶端調用stream的read()函數開始讀取數據;
  • DFSInputStream鏈接保存此文件第一個數據塊的最近的數據節點;
  • Data從數據節點讀到客戶端(client);
  • 當此數據塊讀取完畢後,DFSInputStream關閉和此數據節點的鏈接,而後鏈接此文件下一個數據塊的最近的數據節點;
  • 當客戶端讀取完畢數據的時候,調用FSDataInputStream的close函數;
  • 在讀取數據的過程當中,若是客戶端在與數據點通訊出現錯誤,則嘗試鏈接包含此數據塊的下一個數據節點;
  • 失敗的數據節點將被記錄,之後再也不鏈接;  

  2)解釋二

  • 使用HDFS提供的客戶端開發庫,向遠程的NameNode發起RPC請求;
  • NameNode會視狀況返回文件的部分或所有的block列表,對於每一個block,Namenode都會返回有該block拷貝的datanode地址;
  • 客戶端開發庫會選取離客戶端最接近的datanode來讀取block;
  • 讀取完當前block數據後,關閉與當前datanode鏈接,併爲讀取下一個block尋找最佳的datanode;
  • 當讀完列表的block後,且文件讀取尚未結束,客戶端開發庫會繼續向NameNode獲取下一批的block列表;
  • 讀取完一個block都會進行checksum驗證,若是讀取datanode時出現錯誤,客戶端會通知Namenode,而後再從下一個擁有該block拷貝的datanode繼續讀;  

 

7.2 文件的寫入剖析

  

  寫入文件的過程比讀取較爲複雜:

  1)解釋一

  • 客戶端調用create()來建立文件
  • DistributedFileSystem用RPC調用元數據節點,在文件系統的命名空間中建立一個新的文件。
  • 元數據節點首先肯定文件原來不存在,而且客戶端有建立文件的權限,而後建立新文件。
  • DistributedFileSystem返回DFSOutputStream,客戶端用於寫數據。
  • 客戶端開始寫入數據,DFSOutputStream將數據分紅塊,寫入data queue。
  • Data queue由Data Streamer讀取,並通知 元數據節點分配 數據節點,用來存儲數據塊(每塊默認複製3塊)。分配的數據節點放在一個pipeline裏。
  • Data Streamer將數據塊寫入pipeline中的第一個數據節點。第一個數據節點將數據塊發送給第二個數據節點。第二個數據節點將數據發送給第三個數據節點。
  • DFSOutputStream爲發出去的數據塊保存了ack queue,等待pipeline中的數據節點告知數據已經寫入成功。
  • 若是數據節點在寫入的過程當中失敗:
    • 關閉pipeline,將ack queue中的數據塊放入data queue的開始。
    • 當前的數據塊在已經寫入的數據節點中被元數據節點賦予新的標示,則錯誤節點重啓後可以察覺其數據塊是過期的,會被刪除。
    • 失敗的數據節點從pipeline中移除,另外的數據塊則寫入pipeline中的另外兩個數據節點。
    • 元數據節點則被通知此數據塊是複製塊數不足,未來會再建立第三份備份。
  • 當客戶端結束寫入數據,則調用stream的close函數。此操做將全部的數據塊寫入pipeline中的數據節點,並等待ack queue返回成功。 最後通知元數據節點寫入完畢。

  2)解釋二

  • 使用HDFS提供的客戶端開發庫,向遠程的Namenode發起RPC請求;
  • Namenode會檢查要建立的文件是否已經存在,建立者是否有權限進行操做,成功則會爲文件建立一個記錄,不然會讓客戶端拋出異常;
  • 當客戶端開始寫入文件的時候,開發庫會將文件切分紅多個packets,並在內部以"data queue"的形式管理這些packets,並向Namenode申請新的blocks,獲取用來存儲replicas的合適的datanodes列表,列表的大小根據在Namenode中對replication的設置而定。
  • 開始以pipeline(管道)的形式將packet寫入全部的replicas中。開發庫把packet以流的方式寫入第一個datanode,該datanode把該packet存儲以後,再將其傳遞給在此pipeline中的下一個datanode,直到最後一個datanode,這種寫數據的方式呈流水線的形式。
  • 最後一個datanode成功存儲以後會返回一個ack packet,在pipeline裏傳遞至客戶端,在客戶端的開發庫內部維護着"ack queue",成功收到datanode返回的ack packet後會從"ack queue"移除相應的packet。
  • 若是傳輸過程當中,有某個datanode出現了故障,那麼當前的pipeline會被關閉,出現故障的datanode會從當前的pipeline中移除,剩餘的block會繼續剩下的datanode中繼續以pipeline的形式傳輸,同時Namenode會分配一個新的datanode,保持replicas設定的數量。

 

 

 

  這部分須要實踐,先記錄下來,等週末實踐下。

  感謝做者,原文連接:http://www.cnblogs.com/xia520pi/archive/2012/05/28/2520813.html

相關文章
相關標籤/搜索