Hadoop- HDFS的API操做

 

一、引入依賴java

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.1</version>
</dependency>

注:如需手動引入jar包,hdfs的jar包----hadoop的安裝目錄的share下node

二、window下開發的說明linux

建議在linux下進行hadoop應用的開發,不會存在兼容性問題。如在window上作客戶端應用開發,須要設置如下環境:web

A、windows的某個目錄下解壓一個hadoop的安裝包redis

B、將安裝包下的lib和bin目錄用對應windows版本平臺編譯的本地庫替換apache

C、window系統中配置HADOOP_HOME指向你解壓的安裝包編程

D、windows系統的path變量中加入hadoop的bin目錄windows

 

java中操做hdfs,首先要得到一個客戶端實例api

Configuration conf = new Configuration()

FileSystem fs = FileSystem.get(conf)

 

而咱們的操做目標是HDFS,因此獲取到的fs對象應該是DistributedFileSystem的實例; 服務器

get方法是從何處判斷具體實例化那種客戶端類呢?

——從conf中的一個參數 fs.defaultFS的配置值判斷;

若是咱們的代碼中沒有指定fs.defaultFS,而且工程classpath下也沒有給定相應的配置,conf中的默認值就來自於hadoop的jar包中的core-default.xml,默認值爲: file:///,則獲取的將不是一個DistributedFileSystem的實例,而是一個本地文件系統的客戶端對象;

 

 文件的增刪改查

public class HdfsClient {

    FileSystem fs = null;

    @Before
    public void init() throws Exception {

        // 構造一個配置參數對象,設置一個參數:咱們要訪問的hdfs的URI
        // 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址
        // new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml
        // 而後再加載classpath下的hdfs-site.xml
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
        /**
         * 參數優先級: 一、客戶端代碼中設置的值 二、classpath下的用戶自定義配置文件 三、而後是服務器的默認配置
         */
        conf.set("dfs.replication", "3");

        // 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例
        // fs = FileSystem.get(conf);

        // 若是這樣去獲取,那conf裏面就能夠不要配"fs.defaultFS"參數,並且,這個客戶端的身份標識已是hadoop用戶
        fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

    }

    /**
     * 往hdfs上傳文件
     * 
     * @throws Exception
     */
    @Test
    public void testAddFileToHdfs() throws Exception {

        // 要上傳的文件所在的本地路徑
        Path src = new Path("g:/redis-recommend.zip");
        // 要上傳到hdfs的目標路徑
        Path dst = new Path("/aaa");
        fs.copyFromLocalFile(src, dst);
        fs.close();
    }

    /**
     * 從hdfs中複製文件到本地文件系統
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
        fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
        fs.close();
    }

    @Test
    public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {

        // 建立目錄
        fs.mkdirs(new Path("/a1/b1/c1"));

        // 刪除文件夾 ,若是是非空文件夾,參數2必須給值true
        fs.delete(new Path("/aaa"), true);

        // 重命名文件或文件夾
        fs.rename(new Path("/a1"), new Path("/a2"));

    }

    /**
     * 查看目錄信息,只顯示文件
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     * @throws FileNotFoundException
     */
    @Test
    public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {

        // 思考:爲何返回迭代器,而不是List之類的容器
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

        while (listFiles.hasNext()) {
            LocatedFileStatus fileStatus = listFiles.next();
            System.out.println(fileStatus.getPath().getName());
            System.out.println(fileStatus.getBlockSize());
            System.out.println(fileStatus.getPermission());
            System.out.println(fileStatus.getLen());
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            for (BlockLocation bl : blockLocations) {
                System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
                String[] hosts = bl.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
            System.out.println("--------------爲angelababy打印的分割線--------------");
        }
    }

    /**
     * 查看文件及文件夾信息
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     * @throws FileNotFoundException
     */
    @Test
    public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {

        FileStatus[] listStatus = fs.listStatus(new Path("/"));

        String flag = "d--             ";
        for (FileStatus fstatus : listStatus) {
            if (fstatus.isFile())  flag = "f--         ";
            System.out.println(flag + fstatus.getPath().getName());
        }
    }
}

 經過流的方式訪問hdfs

/**
 * 相對那些封裝好的方法而言的更底層一些的操做方式
 * 上層那些mapreduce   spark等運算框架,去hdfs中獲取數據的時候,就是調的這種底層的api
 * @author
 *
 */
public class StreamAccess {
    
    FileSystem fs = null;

    @Before
    public void init() throws Exception {

        Configuration conf = new Configuration();
        fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

    }
    
    
    
    @Test
    public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
        
        //先獲取一個文件的輸入流----針對hdfs上的
        FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
        
        //再構造一個文件的輸出流----針對本地的
        FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
        
        //再將輸入流中數據傳輸到輸出流
        IOUtils.copyBytes(in, out, 4096);
        
        
    }
    
    
    /**
     * hdfs支持隨機定位進行文件讀取,並且能夠方便地讀取指定長度
     * 用於上層分佈式運算框架併發處理數據
     * @throws IllegalArgumentException
     * @throws IOException
     */
    @Test
    public void testRandomAccess() throws IllegalArgumentException, IOException{
        //先獲取一個文件的輸入流----針對hdfs上的
        FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
        
        
        //能夠將流的起始偏移量進行自定義
        in.seek(22);
        
        //再構造一個文件的輸出流----針對本地的
        FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
        
        IOUtils.copyBytes(in,out,19L,true);
        
    }
    
    
    
    /**
     * 顯示hdfs上文件的內容
     * @throws IOException 
     * @throws IllegalArgumentException 
     */
    @Test
    public void testCat() throws IllegalArgumentException, IOException{
        
        FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
        
        IOUtils.copyBytes(in, System.out, 1024);
    }
}


7.4.3 場景編程
在mapreduce 、spark等運算框架中,有一個核心思想就是將運算移往數據,或者說,就是要在併發計算中儘量讓運算本地化,這就須要獲取數據所在位置的信息並進行相應範圍讀取
如下模擬實現:獲取一個文件的全部block位置信息,而後讀取指定block中的內容
    @Test
    public void testCat() throws IllegalArgumentException, IOException{
        
        FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
        //拿到文件信息
        FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));
        //獲取這個文件的全部block的信息
        BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());
        //第一個block的長度
        long length = fileBlockLocations[0].getLength();
        //第一個block的起始偏移量
        long offset = fileBlockLocations[0].getOffset();
        
        System.out.println(length);
        System.out.println(offset);
        
        //獲取第一個block寫入輸出流
//        IOUtils.copyBytes(in, System.out, (int)length);
        byte[] b = new byte[4096];
        
        FileOutputStream os = new FileOutputStream(new File("d:/block0"));
        while(in.read(offset, b, 0, 4096)!=-1){
            os.write(b);
            offset += 4096;
            if(offset>=length) return;
        };
        os.flush();
        os.close();
        in.close();
    }
相關文章
相關標籤/搜索