HDFS詳解

HDFS基本概念java

一、HDFS設計思想node

分而治之:將大文件、大批量文件,分佈式存放在大量服務器上,以便於採起分而治之的方式對海量數據進行運算分析linux

二、概念和特性web

概念:HDFS是一個分佈式文件系統shell

特性:apache

(1)HDFS中的文件在物理上是分塊存儲(block,塊的大小能夠經過配置參數( dfs.blocksize)來規定,默認大小在hadoop2.x版本中是128M,老版本中是64M編程

(2)HDFS文件系統會給客戶端提供一個統一的抽象目錄樹,客戶端經過路徑來訪問文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.datawindows

(3)目錄結構及文件分塊信息(元數據)的管理由namenode節點承擔api

——namenodeHDFS集羣主節點,負責維護整個hdfs文件系統的目錄樹,以及每個路徑(文件)所對應的block塊信息(blockid,及所在的datanode服務器)緩存

(4)文件的各個block的存儲管理由datanode節點承擔

---- datanodeHDFS集羣從節點,每個block均可以在多個datanode上存儲多個副本(副本數量也能夠經過參數設置dfs.replication

(5)HDFS是設計成適應一次寫入,屢次讀出的場景,且不支持文件的修改

(注:適合用來作數據分析,並不適合用來作網盤應用,由於,不便修改,延遲大,網絡開銷大,成本過高)


 

HDFS基本操做(shell操做)

一、HDFS命令行客戶端使用

二、命令行客戶端支持的命令參數

三、經常使用命令參數介紹

-help             

功能:輸出這個命令參數手冊

hadoop fs -help

-ls                  

功能:顯示目錄信息

示例: hadoop fs -ls hdfs://hadoop1:9000/

備註:這些參數中,全部的hdfs路徑均可以簡寫

-->hadoop fs -ls /   等同於上一條命令的效果hadoop fs -ls -R / 會列出全部嵌套文件 

-mkdir              

功能:在hdfs上建立目錄

示例:hadoop fs  -mkdir  -p  /aaa/bbb/cc/dd

-moveFromLocal            

功能:從本地剪切粘貼到hdfs

示例:hadoop  fs  - moveFromLocal  /home/hadoop/a.txt  /aaa/bbb/cc/dd

-moveToLocal              

功能:從hdfs剪切粘貼到本地

示例:hadoop  fs  - moveToLocal   /aaa/bbb/cc/dd  /home/hadoop/a.txt 

--appendToFile  

功能:追加一個文件到已經存在的文件末尾

示例:hadoop  fs  -appendToFile  ./hello.txt  hdfs://hadoop-server01:9000/hello.txt

能夠簡寫爲:

hadoop  fs  -appendToFile  ./hello.txt  /hello.txt

 

-cat  

功能:顯示文件內容  

示例:hadoop fs -cat  /hello.txt

 

-tail                 

功能:顯示一個文件的末尾

示例:hadoop  fs  -tail  /weblog/access_log.1

-text                  

功能:以字符形式打印一個文件的內容

示例:hadoop  fs  -text  /weblog/access_log.1

-chgrp

-chmod

-chown

功能:linux文件系統中的用法同樣,對文件所屬權限

示例:

hadoop  fs  -chmod  666  /hello.txt

hadoop  fs  -chown  someuser:somegrp   /hello.txt

-copyFromLocal    

功能:從本地文件系統中拷貝文件到hdfs路徑去

示例:hadoop  fs  -copyFromLocal  ./jdk.tar.gz  /aaa/

-copyToLocal      

功能:從hdfs拷貝到本地

示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz   /

-cp              

功能:從hdfs的一個路徑拷貝hdfs的另外一個路徑

示例: hadoop  fs  -cp  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2

 

-mv                     

功能:在hdfs目錄中移動文件

示例: hadoop  fs  -mv  /aaa/jdk.tar.gz  /

-get              

功能:等同於copyToLocal,就是從hdfs下載文件到本地

示例:hadoop fs  -get  /aaa/jdk.tar.gz   /

-getmerge             

功能:合併下載多個文件

示例:好比hdfs的目錄 /aaa/下有多個文件:log.1, log.2,log.3,...

hadoop fs -getmerge  /aaa/log.*   ./log.sum

-put                

功能:等同於copyFromLocal

示例:hadoop  fs  -put  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2

 

-rm                

功能:刪除文件或文件夾

示例:hadoop fs -rm -r /aaa/bbb/

 

-rmdir                 

功能:刪除空目錄

示例:hadoop  fs  -rmdir   /aaa/bbb/ccc

-df               

功能:統計文件系統的可用空間信息

示例:hadoop  fs  -df  -h  /

 

-du

功能:統計文件夾的大小信息

示例:

hadoop  fs  -du  -s  -h /aaa/*

 

-count         

功能:統計一個指定目錄下的文件節點數量

示例:hadoop  fs  -count  /aaa/

 

-setrep                

功能:設置hdfs中文件的副本數量

示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz

<這裏設置的副本數只是記錄在namenode的元數據中,是否真的會有這麼多副本,還得看datanode的數量>

 

 

 


 

HDFS原理

一、HDFS的工做機制

1.1概述

  1. HDFS集羣分爲兩大角色:NameNodeDataNode
  2. NameNode負責管理整個文件系統的元數據
  3. DataNode 負責管理用戶的文件數據塊
  4. 文件會按照固定的大小(blocksize)切成若干塊後分布式存儲在若干臺datanode
  5. 每個文件塊能夠有多個副本,並存放在不一樣的datanode
  6. Datanode會按期向Namenode彙報自身所保存的文件block信息,而namenode則會負責保持文件的副本數量
  7. HDFS的內部工做機制對客戶端保持透明,客戶端請求訪問HDFS都是經過向namenode申請來進行

1.2HDFS寫數據流程

1.2.1概述

 

客戶端要向HDFS寫數據,首先要跟namenode通訊以確承認以寫文件並得到接收文件blockdatanode,而後,客戶端按順序將文件逐個block傳遞給相應datanode,並由接收到blockdatanode負責向其餘datanode複製block的副本

1.2.2詳細步驟圖(上傳文件)

 

1.2.3詳細步驟解析

1、client跟namenode通訊,請求上傳文件,namenode檢查目錄樹中目標文件是否已存在?父目錄是否存在?

2namenode返回是否能夠上傳

3client請求第一個block該傳輸到哪些datanode服務器上

4namenode查詢DataNode信息,而後返回3個可用的datanode服務器ABC給client

5client請求3臺DataNode中的一臺A上傳數據(本質上是一個RPC調用,創建pipeline),A收到請求會繼續調用B,而後B調用C,將整個pipeline創建完成,逐級返回客戶端

6client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet爲單位,A收到一個packet就會傳給BB傳給CA每傳一個packet會放入一個應答隊列等待應答

7、當一個block傳輸完成以後,client再次請求namenode上傳第二個block的服務器。

 

1.3HDFS讀數據流程

1.3.1概述

客戶端將要讀取的文件路徑發送給namenodenamenode獲取文件的元信息(主要是block的存放位置信息)返回給客戶端,客戶端根據返回的信息找到相應datanode逐個獲取文件的block並在客戶端本地進行數據追加合併從而得到整個文件

1.3.2詳細步驟圖(下載文件)

 

1.3.3詳細步驟解析

1、client跟namenode通訊請求讀取文件,namenode查詢元數據,找到文件塊所在的datanode服務器,而後將每一個文件塊所在的DataNode服務器返回給client

2、挑選一臺datanode(就近原則,而後隨機)服務器,請求創建socket

3datanode開始發送數據(從磁盤裏面讀取數據放入流,以packet爲單位來作校驗)

4、客戶端以packet爲單位接收,先在本地緩存,而後寫入目標文件

 

二、NameNode工做機制

2.1namenode職責

  負責客戶端青請求的響應

  元數據的管理(查詢、修改)

2.2元數據管理

namenode對數據的管理採用了三種存儲形式:

  內存元數據(NameSystem)

  磁盤元數據鏡像文件

  數據操做日誌文件(可經過日誌運算出元數據)

2.2.1元數據存儲機制

A、內存中有一份完整的元數據(內存meta data)

B、磁盤有一個「準完整」的元數據鏡像(fsimage)文件(namenode的工做目錄中)

C、用於銜接內存metadata和持久化元數據鏡像fsimage之間的操做日誌(edits文件

注:當客戶端對hdfs中的文件進行新增或者修改操做,操做記錄首先被記入edits日誌文件中,當客戶端操做成功後,相應的元數據會更新到內存meta.data

2.2.2元數據手動查看

能夠經過hdfs的一個工具來查看edits中的信息

  bin/hdfs oev -i edits -o edits.xml

  bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml

2.2.3元數據的checkpoint

每隔一段時間,會由secondary namenodenamenode上積累的全部edits和一個最新的fsimage下載到本地,並加載到內存進行merge(這個過程稱爲checkpoint

checkpoint的詳細過程

checkpoint操做的觸發條件配置參數

dfs.namenode.checkpoint.check.period=60  #檢查觸發條件是否知足的頻率,60

dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary

#以上兩個參數作checkpoint操做時,secondary namenode的本地工做目錄

dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}

 

dfs.namenode.checkpoint.max-retries=3  #最大重試次數

dfs.namenode.checkpoint.period=3600  #兩次checkpoint之間的時間間隔3600秒

dfs.namenode.checkpoint.txns=1000000 #兩次checkpoint之間最大的操做記錄

checkpoint的附帶做用

namenodesecondary namenode的工做目錄存儲結構徹底相同,因此,當namenode故障退出須要從新恢復時,能夠從secondary namenode的工做目錄中將fsimage拷貝到namenode的工做目錄,以恢復namenode的元數據

 

三、DataNode工做機制

3.1 概述

1Datanode工做職責:

存儲管理用戶的文件塊數據

按期向namenode彙報自身所持有的block信息(經過心跳信息上報)

(這點很重要,由於,當集羣中發生某些block副本失效時,集羣如何恢復block初始副本數量的問題

 

<property>

<name>dfs.blockreport.intervalMsec</name>

<value>3600000</value>

<description>Determines block reporting interval in milliseconds.</description>

</property>

 

2Datanode掉線判斷時限參數

datanode進程死亡或者網絡故障形成datanode沒法與namenode通訊,namenode不會當即把該節點斷定爲死亡,要通過一段時間,這段時間暫稱做超時時長。HDFS默認的超時時長爲10分鐘+30秒。若是定義超時時間爲timeout,則超時時長的計算公式爲:

timeout  = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval

而默認的heartbeat.recheck.interval 大小爲5分鐘,dfs.heartbeat.interval默認爲3秒。

須要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的單位爲毫秒,dfs.heartbeat.interval的單位爲秒。因此,舉個例子,若是heartbeat.recheck.interval設置爲5000(毫秒),dfs.heartbeat.interval設置爲3(秒,默認),則總的超時時間爲40秒。

 

 

<property>

        <name>heartbeat.recheck.interval</name>

        <value>2000</value>

</property>

<property>

        <name>dfs.heartbeat.interval</name>

        <value>1</value>

</property>

 

3.2 觀察驗證DATANODE功能

上傳一個文件,觀察文件的block具體的物理存放狀況:

在每一臺datanode機器上的這個目錄中能找到文件的切塊:

/usr/local/hadoop/tmp/dfs/data/current/BP-193442119-192.168.2.120-1432457733977/current/finalized

 


 

HDFS應用開發(Java操做)

一、搭建開發環境

1.1引入依賴

  手動引入jar包,hdfs的jar包位於hadoop安裝目錄的share文件夾下。

  建立一個hdfsjar用戶類庫,而後將common和hdfs兩個文件夾中的jar包所有添加進去。

1.2Windows下開發的說明

  建議在Linux下進行hadoop應用的開發,這樣不會存在兼容性問題。

  若是在Windows上作客戶端應用開發,須要設置一下環境:

      A、windows的某個目錄下解壓一個hadoop的安裝  d:\hadoop-2.6.4

      B、將安裝包下的libbin目錄用對應windows版本平臺編譯的本地庫替換

      C、window系統中配置HADOOP_HOME指向你解壓的安裝包

            HADOOP_HOME=d:\hadoop-2.6.4

      D、windows系統的path變量中加入hadoopbin目錄

            PATH=d:\hadoop-2.6.4\bin

二、獲取API中的客戶端對象

java中操做hdfs,首先要得到一個客戶端實例

Configuration conf = new Configuration()

FileSystem fs = FileSystem.get(conf)

 

而咱們的操做目標是HDFS,因此獲取到的fs對象應該是DistributedFileSystem的實例;

get方法是從何處判斷具體實例化那種客戶端類呢?

——從conf中的一個參數 fs.defaultFS的配置值判斷;

若是咱們的代碼中沒有指定fs.defaultFS,而且工程classpath下也沒有給定相應的配置,conf中的默認值就來自於hadoopjar包中的core-default.xml,默認值爲: file:///,則獲取的將不是一個DistributedFileSystem的實例,而是一個本地文件系統的客戶端對象

三、DistributedFileSystem實例對象所具有的方法

 

四、hdfs客戶端操做數據代碼

4.1文件的增刪改查

  1 package com.ahu.bigdata.javaclient;
  2 
  3 import java.net.URI;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.BlockLocation;
  6 import org.apache.hadoop.fs.FileStatus;
  7 import org.apache.hadoop.fs.FileSystem;
  8 import org.apache.hadoop.fs.LocatedFileStatus;
  9 import org.apache.hadoop.fs.Path;
 10 import org.apache.hadoop.fs.RemoteIterator;
 11 
 12 /**
 13  * 使用hdfsJava客戶端實現文件的增刪改查
 14  * 
 15  * @author ahu_lichang
 16  * 
 17  */
 18 public class HdfsClientDemo {
 19     static FileSystem fs = null;
 20 
 21     public static void init() throws Exception {
 22         // 構造一個配置參數對象,設置一個參數:咱們要訪問的hdfs的URI
 23         // 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址
 24         // new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml
 25         // 而後再加載classpath下的hdfs-site.xml
 26         Configuration configuration = new Configuration();
 27         // configuration.set("fs.defaultFS", "hdfs://hadoop1:9000");
 28         /*
 29          * 參數優先級:一、客戶端代碼中設置的值 二、classpath下的用戶自定義配置文件 三、服務器的默認設置
 30          */
 31         // configuration.set("dfs.replication", "3");
 32         // 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例
 33         // fs=FileSystem.get(configuration);
 34         // 若是這樣去獲取,那configuration裏面就能夠不要配"fs.defaultFS"參數,並且,這個客戶端的身份標識已是root用戶
 35         fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration,
 36                 "root");
 37     }
 38 
 39     /**
 40      * 向hdfs上傳文件
 41      * 
 42      * @throws Exception
 43      */
 44     public static void testAddFileToHdfs() throws Exception {
 45         // 本地路徑
 46         Path src = new Path("E:/access.log");
 47         // 目標路徑
 48         Path dst = new Path("/access.log.copy");
 49         fs.copyFromLocalFile(src, dst);
 50         fs.close();
 51     }
 52 
 53     /**
 54      * 從hdfs中複製文件到本地文件系統
 55      * 
 56      * @throws Exception
 57      */
 58     public static void testDownloadFileToLocal() throws Exception {
 59         fs.copyToLocalFile(new Path("/access.log.copy"), new Path("E:/"));
 60         fs.close();
 61     }
 62 
 63     /**
 64      * 在hdfs中建立目錄、刪除文件夾、重命名文件或文件夾
 65      * 
 66      * @throws Exception
 67      */
 68     public static void testMkdirAndDeleteAndRename() throws Exception {
 69         // 建立目錄
 70         fs.mkdirs(new Path("/a1/b1/c1"));
 71         // 刪除文件夾,若是是非空文件夾,參數2必須給值true
 72         fs.delete(new Path("/access.log.copy"), true);
 73         // 重命名文件或文件夾
 74         fs.rename(new Path("/a1"), new Path("/a2"));
 75     }
 76 
 77     /**
 78      * 查看目錄信息,只顯示文件
 79      * 
 80      * @throws Exception
 81      */
 82     public static void testListFiles() throws Exception {
 83         RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(
 84                 new Path("/"), true);
 85         while (listFiles.hasNext()) {
 86             LocatedFileStatus fileStatus = listFiles.next();
 87             System.out.println(fileStatus.getPath().getName());// job
 88             System.out.println(fileStatus.getBlockSize());// 塊大小(都同樣大)
 89             System.out.println(fileStatus.getPermission());// 權限
 90             System.out.println(fileStatus.getLen());// 塊大小
 91             BlockLocation[] blockLocations = fileStatus.getBlockLocations();
 92             for (BlockLocation blockLocation : blockLocations) {
 93                 System.out.println("block-length:" + blockLocation.getLength()
 94                         + "---" + "block-offset" + blockLocation.getOffset());// 塊長度、塊的起始偏移量
 95                 String[] hosts = blockLocation.getHosts();
 96                 for (String host : hosts) {// 塊分佈在哪些主機上
 97                     System.out.println(host);
 98                 }
 99             }
100             System.out
101                     .println("----------------------------------------------------------");
102         }
103     }
104 
105     /**
106      * 查看文件及文件夾信息(跟hadoop fs -ls /查詢的效果差很少)
107      * 
108      * @throws Exception
109      */
110     public static void testListAll() throws Exception {
111         FileStatus[] listStatus = fs.listStatus(new Path("/"));
112         String flag = "d--         ";
113         for (FileStatus fileStatus : listStatus) {
114             if (fileStatus.isFile())
115                 flag = "f--         ";
116             System.out.println(flag + fileStatus.getPath().getName());
117         }
118     }
119 
120     public static void main(String[] args) throws Exception {
121         init();
122         // testAddFileToHdfs();
123         // testDownloadFileToLocal();
124         // testMkdirAndDeleteAndRename();
125         // testListFiles();
126         testListAll();
127     }
128 }

 

4.2經過流的方式訪問hdfs和場景編程

  1 package com.ahu.bigdata.javaclient;
  2 
  3 import java.io.File;
  4 import java.io.FileOutputStream;
  5 import java.io.IOException;
  6 import java.net.URI;
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.BlockLocation;
  9 import org.apache.hadoop.fs.FSDataInputStream;
 10 import org.apache.hadoop.fs.FileStatus;
 11 import org.apache.hadoop.fs.FileSystem;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.io.IOUtils;
 14 
 15 /**
 16  * 經過流的方式訪問hdfs
 17  * 
 18  * 相對於那些封裝好的方法而言的,是更底層的一些操做方式
 19  * 
 20  * 上層的那些mapreduce、spark等運算框架,去hdfs中獲取數據的時候,就是調用這種底層的api
 21  * 
 22  * @author ahu_lichang
 23  * 
 24  */
 25 public class StreamAccess {
 26     static FileSystem fs = null;
 27 
 28     public static void init() throws Exception {
 29         Configuration configuration = new Configuration();
 30         fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration,
 31                 "root");
 32     }
 33 
 34     /**
 35      * 將hdfs中的文件下載到本地
 36      * 
 37      * @throws Exception
 38      */
 39     public static void testDownloadFileToLocal() throws Exception {
 40         // 先獲取一個文件的輸入流---針對hdfs
 41         FSDataInputStream inputStream = fs.open(new Path(
 42                 "/wordcount/output/part-r-00000"));
 43         // 再構造一個文件的輸出流---針對本地
 44         FileOutputStream outputStream = new FileOutputStream(new File(
 45                 "E:/part-r-00000"));
 46         // 將輸入流中數據傳輸到輸出流中
 47         IOUtils.copyBytes(inputStream, outputStream, 4096);// 4096緩衝區大小
 48     }
 49 
 50     /**
 51      * hdfs支持隨機定位進行文件讀取,並且能夠方便地讀取指定長度
 52      * 
 53      * 用於上層分佈式運算框架併發處理數據
 54      * 
 55      * @throws Exception
 56      */
 57     public static void testRandomAccess() throws Exception {
 58         // 先獲取一個文件的輸入流---針對hdfs
 59         FSDataInputStream inputStream = fs.open(new Path(
 60                 "/wordcount/output/part-r-00000"));
 61         // 能夠將輸入流的起始偏移量進行自定義
 62         inputStream.seek(200);
 63         // 再構造一個文件的輸出流---針對本地
 64         FileOutputStream outputStream = new FileOutputStream(new File(
 65                 "E:/part-r-custom"));
 66         IOUtils.copyBytes(inputStream, outputStream, 4096L, true);// true表示傳輸完畢關閉流
 67     }
 68 
 69     /**
 70      * 顯示hdfs文件上的內容
 71      * 
 72      * @throws Exception
 73      */
 74     public static void testCat() throws Exception {
 75         FSDataInputStream inputStream = fs.open(new Path(
 76                 "/wordcount/output/part-r-00000"));
 77         IOUtils.copyBytes(inputStream, System.out, 1024);
 78     }
 79 
 80     /**
 81      * 場景編程:獲取一個文件的全部block位置信息,而後讀取指定block中的內容
 82      * 
 83      * @throws IOException
 84      * @throws IllegalArgumentException
 85      */
 86     public static void testBlockCat() throws Exception {
 87         FSDataInputStream inputStream = fs.open(new Path(
 88                 "/wordcount/output/part-r-00000"));
 89         // 拿到文件信息
 90         FileStatus[] listStatus = fs.listStatus(new Path(
 91                 "/wordcount/output/part-r-00000"));
 92         // 獲取這個文件的全部block的信息
 93         BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(
 94                 listStatus[0], 0L, listStatus[0].getLen());
 95         // 第一個block的長度
 96         long length = fileBlockLocations[0].getLength();
 97         // 第一個block的起始偏移量
 98         long offset = fileBlockLocations[0].getOffset();
 99         System.out.println(length);
100         System.out.println(offset);
101         // 獲取第一個block寫入輸出流
102         byte[] b = new byte[4096];
103         FileOutputStream fos = new FileOutputStream(new File("E:/block0"));
104         while (inputStream.read(offset, b, 0, 4096) != -1) {
105             fos.write(b);
106             offset += 4096;
107             if (offset >= length)
108                 return;
109         }
110         fos.flush();
111         fos.close();
112         inputStream.close();
113     }
114 
115     public static void main(String[] args) throws Exception {
116         init();
117         // testDownloadFileToLocal();
118         // testRandomAccess();
119         // testCat();
120         testBlockCat();
121     }
122 
123 }
相關文章
相關標籤/搜索