2018年第23周-大數據的HDFS數據流及操做例子

上週已經把Hadoop的HDFS的架構和設計大概說了下,也有部署過程。在這周講的HDFS的數據流及操做例子html

HDFS數據流

HDFS系統依賴於如下服務
1.NameNode
2.DataNode
3.JournalNode
4.zkfc前端

其中JournalNode和zkfc是用來作高可用的。
那麼數據流將在客戶端、NameNode和DataNode之間進行流轉。
HDFS可經過如下接口進行操做:
1.HTTP
2.C
3.NFS
4.FUSE
5.Java接口
本篇文章着重講的是Java接口——FileSystem類。java

讀數據流

FileSystem類是Hadoop提供操做HDFS的Java類,經過這類,咱們就能夠做爲客戶端進行操做HDFS(除了本身寫的服務是客戶端,在節點上運行的MR程序(MapReduce)也是客戶端,並且仍是主要且經常使用的)。
如下是客戶端、NameNode和DataNode之間的(讀)數據流圖:
圖片描述node

1.客戶端經過FileSystem對象的open()方法
2.open()方法經過DistributeFileSystem對象經過RPC調用NameNode,獲取文件起始塊的位置及其副本的DataNode地址,並返回FSDataInputStream對象。git

DataNode是根據DataNode與客戶端的距離進行排序,若是客戶端自己就是一個DataNode,那麼客戶端將會從保存有相應數據塊副本的本地DataNode讀取數據。

3.調用FSDataInputStream對象的read()方法時,它會調用其自身的read方法將數據從DataNode傳輸到客戶端。
4.到達塊的末端時,DFSInputStream關閉與該DataNode的連接,而後尋找下一塊數據的最佳DataNode。這些對於客戶端都是透明的,在客戶端看來它是一直在讀取一個連續的流。shell

若是DFSInputStream在與DataNode通訊時遇到錯誤,會嘗試從這個塊的另外一個最鄰近的DataNode讀取數據。DFSDFSInputStream會記住故障的DataNode,以保證不會反覆讀取該節點上後續的塊。DFSInputStream也會經過 校驗和 確認從DataNode讀取的數據是否完整。若是發現有損壞的塊,DFSInputStream會試圖從其餘DataNode讀取其副本,也會將損壞的塊通知給NameNode。
5.客戶端讀取完後,會調用close()方法

寫數據流

如下是客戶端、NameNode和DataNode之間的(寫)數據流圖:
圖片描述apache

1.客戶端經過FileSystem對象的create()方法
2.create()方法經過DistributeFileSystem對象經過RPC調用NameNode,在文件系統的命名空間新建一個文件,此時該文件中尚未相應的數據塊,,並返回FSDataOutputStream對象。數組

NameNode執行各類不一樣的檢查以確保這個文件不存在以及客戶端有新建該文件的權限,若是經過檢查,NameNode會建立新文件,不然 ,文件建立失敗並向客戶端拋出IOException異常。

3.調用FSDataOutputStream對象的write()方法時,它會使用DFSOutputStream對象進行寫入數據(DFSOutputStream是封裝在FSDataOutputStream)。服務器

在客戶端寫入數據時,DFSOutputStream將它分紅一個個的數據包(DFSPacket),並寫入內部隊列,稱爲「數據隊列(dataQueue,是其內部成員變量LinkedList)」。DataStreamer處理數據隊列,它的職責時挑選出適合存儲數據副本的一組DataNode,並根據此要求NameNode分配新的數據塊。這一組DataNode構成一個 管線,若是副本數是3個,則管線中有3個DataNode節點。DataStreamer將數據包流式傳輸到管線中第1個DataNode,第1個DataNode保存數據包並將數據包繼續發送到管線的第2個DataNode,如此類推到第3個DataNode節點。
DFSOutputStream其成員變量ackQueue「確認隊列」,維護者一個內部數據包隊列來等待DataNode的確認回執。,收到管線中全部DataNode節點的確認信息後,該數據包纔會從ackQueue刪除。
異常狀況,若是任意DataNode在數據包寫入期間失敗,則執行如下操做:首先關閉管線,會從ackQueue把全部數據包都添加回dataQueue的最前端,以保證故障節點下游的DataNode不會漏掉任何一個數據包。並將標識傳給NameNode,以便故障DataNode在恢復後能夠刪除存儲部分的數據塊。從管線刪除故障DataNode後,基於正常DataNode構建一條新的管線,繼續寫數據。

4.客戶端完成數據的寫入後,對數據流調用close()方法,該操做等待NameNode返回確認寫入完成。架構

HDFS操做例子

命令行

管理命令參考:http://hadoop.apache.org/docs...
文件操做命令參考:http://hadoop.apache.org/docs... 不過這文檔裏的hadoop fs 要改成hdfs dfs

  • 查看版本hdfs version
[jevoncode@s1 ~]$ hdfs version
Hadoop 2.7.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r baa91f7c6bc9cb92be5982de4719c1c8af91ccff
Compiled by root on 2016-08-18T01:41Z
Compiled with protoc 2.5.0
From source with checksum 2e4ce5f957ea4db193bce3734ff29ff4
This command was run using /mydata1/hadoop-2.7.3/share/hadoop/common/hadoop-common-2.7.3.jar
  • 查看文件系統的統計信息hdfs dfsadmin -report
[jevoncode@s1 ~]$hdfs dfsadmin -report
Configured Capacity: 158127783936 (147.27 GB)
Present Capacity: 148158701568 (137.98 GB)
DFS Remaining: 148158615552 (137.98 GB)
DFS Used: 86016 (84 KB)
DFS Used%: 0.00%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0

-------------------------------------------------
Live datanodes (3):

Name: 192.168.31.181:50010 (s6.jevoncode.com)
Hostname: s6.jevoncode.com
Decommission Status : Normal
Configured Capacity: 52709261312 (49.09 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3323027456 (3.09 GB)
DFS Remaining: 49386205184 (45.99 GB)
DFS Used%: 0.00%
DFS Remaining%: 93.70%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1
Last contact: Sun Jun 10 14:00:14 CST 2018

...
  • 建立和目錄
[jevoncode@s1 ~]$ hdfs dfs -mkdir /opt/
[jevoncode@s1 ~]$ hdfs dfs -mkdir /opt/command/
[jevoncode@s1 ~]$ hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - jevoncode supergroup          0 2018-06-10 14:05 /opt
  • 複製文本文件到HDFS
hdfs dfs -put sougouword.txt /opt/command/word.txt
  • 複製HDFS的文件到本地
hdfs dfs -get /opt/command/word.txt sougouword2.txt
  • 刪除文件
hdfs dfs -rm /opt/command/word.txt

Java接口

  • 經過URL對象獲取文件內容
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

import java.io.InputStream;
import java.net.URL;

/**
 * 前期準備:
 * [jevoncode@s1 ~]# hdfs dfs -mkdir /opt/
 * [jevoncode@s1 ~]# hdfs dfs -mkdir /opt/command/
 * [jevoncode@s1 ~]# hdfs dfs -put sougouword.txt /opt/command/word.txt
 * <p>
 * <p>
 * 方法一:動態參數
 * 命令以下:上傳至hadoop服務器
 * [jevoncode@s1 ~]# export HADOOP_CLASSPATH=jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar
 * [jevoncode@s1 ~]# hadoop com.jc.demo.hadoop.hdfs.URLCat hdfs://ns/opt/command/word.txt
 * 其中ns是hdfs-site.xml配置的主機名,用於高可用
 *
 * <p>
 * 方法二:遠程訪問
 * 直接執行main方法,使用hdfsHost作參數,可遠程訪問
 */
public class URLCat {

    static {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    public static void main(String[] args) throws Exception {
        String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";
        InputStream in = null;
        try {
//            in = new URL(args[0]).openStream();  //方法一:動態參數
            in = new URL(hdfsHost).openStream();    //方法二:遠程訪問
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }
}
  • 經過FileSystem獲取文件內容
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.InputStream;
import java.net.URI;

/**
 * 使用FileSystem獲取文件內容
 * Configuration在這例子中僅僅作個參數而已,沒啥用,仍是須要在代碼裏指定url
 *
 */
public class FileSystemCat {

    public static void main(String[] args) throws Exception {
        String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";
        String uri = hdfsHost;
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        InputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }
}
  • FSDataInputStream還支持隨機讀
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.net.URI;

/**
 * 使用FSDataInputStream隨機讀
 */
public class FileSystemDoubleCat {

    public static void main(String[] args) throws Exception {
        String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";
        String uri = hdfsHost;
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        FSDataInputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096, false);
            in.seek(0); // go back to the start of the file
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }
}
  • 使用FileSystem複製文件(寫入),會自動建立目錄
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

/**
 * 使用FileSystem複製文件(寫入),會自動建立目錄
 * FileSystem也是能夠操做本地文件的,因此沒有指定協議,就會操做本地文件目錄/opt/java
 */
public class FileCopyWithProgress {
    public static void main(String[] args) throws Exception {
        String localSrc = "/home/cherry/Downloads/鬥破蒼穹.txt";
        String dst = "hdfs://s1.jevoncode.com:9000/opt/java/鬥破蒼穹.txt";
//        String dst = "/opt/java/"; //FileSystem也是能夠操做本地文件的,因此沒有指定協議,就會操做本地文件目錄/opt/java
        InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(dst), conf);
        OutputStream out = fs.create(new Path(dst), new Progressable() {
            public void progress() {
                System.out.print(".");
            }
        });
        IOUtils.copyBytes(in, out, 4096, true);
    }
}
  • 遍歷目錄(僅一層)
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

import java.net.URI;

/**
 * 獲取當前目錄的全部文件及目錄信息(僅一層)
 */
public class ListStatus {

    public static void main(String[] args) throws Exception {
        String hdfsHost = "hdfs://s1.jevoncode.com:9000/";
        String uri = hdfsHost;
        args = new String[]{"/opt/", "/dir"};
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path[] paths = new Path[args.length];
        for (int i = 0; i < paths.length; i++) {
            paths[i] = new Path(args[i]);
        }
        FileStatus[] status = fs.listStatus(paths);
        Path[] listedPaths = FileUtil.stat2Paths(status);       //將文件狀態FileStatus數組轉爲Path數組
        for (Path p : listedPaths) {
            System.out.println(p);
        }
    }
}
/**
 * output:
 * 06-10 14:29:16 [main] DEBUG o.a.h.s.a.util.KerberosName - Kerberos krb5 configuration not found, setting default realm to empty
 * 06-10 14:29:16 [main] DEBUG o.a.hadoop.util.PerformanceAdvisory - Falling back to shell based
 * 06-10 14:29:18 [main] DEBUG o.a.hadoop.util.PerformanceAdvisory - Both short-circuit local reads and UNIX domain socket are disabled.
 * 06-10 14:29:18 [main] DEBUG o.a.h.h.p.d.s.DataTransferSaslUtil - DataTransferProtocol not using SaslPropertiesResolver, no QOP found in configuration for dfs.data.transfer.protection
 * hdfs://s1.jevoncode.com:9000/opt/command
 * hdfs://s1.jevoncode.com:9000/opt/java
 */
  • 獲取文件信息(建立時間,建立者等)
package com.jc.demo.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

/**
 * 獲取文件狀態
 */
public class ShowFileStatusTest {
    private FileSystem fs;

    @Before
    public void setUp() throws IOException {

        String hdfsHost = "hdfs://s1.jevoncode.com:9000/";
        String uri = hdfsHost;
        Configuration conf = new Configuration();
        fs = FileSystem.get(URI.create(uri), conf);

        OutputStream out = fs.create(new Path("/dir/file"));
        out.write("content".getBytes("UTF-8"));
        out.close();
    }

    @After
    public void tearDown() throws IOException {
        if (fs != null) {
            fs.close();
        }
    }

    @Test(expected = FileNotFoundException.class)
    public void throwsFileNotFoundForNonExistentFile() throws IOException {
        fs.getFileStatus(new Path("no-such-file"));
    }

    /**
     * 測試文件狀態
     * @throws IOException
     * @throws InterruptedException
     */
    @Test
    public void fileStatusForFile() throws IOException, InterruptedException {
        Path file = new Path("/dir/file");
        FileStatus stat = fs.getFileStatus(file);
        assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));                      //路徑應爲/dir/file
        assertThat(stat.isDirectory(), is(false));                                          //不是目錄
        assertThat(stat.getLen(), is(7L));                                                  //文件大小
        Thread.sleep(3000);                                                                 //避免建立時間大於測試時間
        assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));//建立時間應該小於測試時間
        assertThat(stat.getReplication(), is((short) 3));                                         //副本個數
        assertThat(stat.getBlockSize(), is(128 * 1024 * 1024L));                            //塊大小
        assertThat(stat.getOwner(), is(System.getProperty("user.name")));                         //當前用戶是其建立者
        assertThat(stat.getGroup(), is("supergroup"));                                      //文件的用戶組校驗
        assertThat(stat.getPermission().toString(), is("rw-r--r--"));                       //文件權限教研
    }

    @Test
    public void fileStatusForDirectory() throws IOException {
        Path dir = new Path("/dir");
        FileStatus stat = fs.getFileStatus(dir);
        assertThat(stat.getPath().toUri().getPath(), is("/dir"));
        assertThat(stat.isDirectory(), is(true));
        assertThat(stat.getLen(), is(0L));
        assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));
        assertThat(stat.getReplication(), is((short) 0));
        assertThat(stat.getBlockSize(), is(0L));
        assertThat(stat.getOwner(), is(System.getProperty("user.name")));
        assertThat(stat.getGroup(), is("supergroup"));
        assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
    }

}
相關文章
相關標籤/搜索