一.Hdfs簡介java
hdfs是一個文件系統,用於存儲文件,經過統一的命名空間——目錄樹來定位文件,而且是分佈式的,由不少服務器聯合起來實現其功能,集羣中的服務器各自負責角色;node
重要特徵:linux
1.HDFS中的文件在物理上是分塊存儲(block),塊的大小能夠經過配置參數( dfs.blocksize)來規定,默認大小在hadoop2.x版本中是128M,老版本中是64Mweb
2.HDFS文件系統會給客戶端提供一個統一的抽象目錄樹,客戶端經過路徑來訪問文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.dataredis
3.目錄結構及文件分塊信息(元數據)的管理由namenode節點承擔——namenode是HDFS集羣主節點,負責維護整個hdfs文件系統的目錄樹,以及每個路徑(文件)所對應的block塊信息(block的id,及所在的datanode服務器)shell
4.文件的各個block的存儲管理由datanode節點承擔---- datanode是HDFS集羣從節點,每個block均可以在多個datanode上存儲多個副本(副本數量也能夠經過參數設置dfs.replication)apache
5.HDFS是設計成適應一次寫入,屢次讀出的場景,且不支持文件的修改windows
二.Hdfs的shell(命令行客戶端)操做api
Hdfs提供shell命令行客戶端,使用方法以下:緩存
經常使用命令參數介紹:
-help 功能:輸出這個命令參數手冊 |
-ls 功能:顯示目錄信息 示例: hadoop fs -ls hdfs://hadoop-server01:9000/ 備註:這些參數中,全部的hdfs路徑均可以簡寫 -->hadoop fs -ls / 等同於上一條命令的效果 |
-mkdir 功能:在hdfs上建立目錄 示例:hadoop fs -mkdir -p /aaa/bbb/cc/dd |
-moveFromLocal 功能:從本地剪切粘貼到hdfs 示例:hadoop fs - moveFromLocal /home/hadoop/a.txt /aaa/bbb/cc/dd -moveToLocal 功能:從hdfs剪切粘貼到本地 示例:hadoop fs - moveToLocal /aaa/bbb/cc/dd /home/hadoop/a.txt |
--appendToFile 功能:追加一個文件到已經存在的文件末尾 示例:hadoop fs -appendToFile ./hello.txt hdfs://hadoop-server01:9000/hello.txt 能夠簡寫爲: Hadoop fs -appendToFile ./hello.txt /hello.txt
|
-cat 功能:顯示文件內容 示例:hadoop fs -cat /hello.txt
-tail 功能:顯示一個文件的末尾 示例:hadoop fs -tail /weblog/access_log.1 -text 功能:以字符形式打印一個文件的內容 示例:hadoop fs -text /weblog/access_log.1 |
-chgrp -chmod -chown 功能:linux文件系統中的用法同樣,對文件所屬權限 示例: hadoop fs -chmod 666 /hello.txt hadoop fs -chown someuser:somegrp /hello.txt |
-copyFromLocal 功能:從本地文件系統中拷貝文件到hdfs路徑去 示例:hadoop fs -copyFromLocal ./jdk.tar.gz /aaa/ -copyToLocal 功能:從hdfs拷貝到本地 示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz |
-cp 功能:從hdfs的一個路徑拷貝hdfs的另外一個路徑 示例: hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
-mv 功能:在hdfs目錄中移動文件 示例: hadoop fs -mv /aaa/jdk.tar.gz / |
-get 功能:等同於copyToLocal,就是從hdfs下載文件到本地 示例:hadoop fs -get /aaa/jdk.tar.gz -getmerge 功能:合併下載多個文件 示例:好比hdfs的目錄 /aaa/下有多個文件:log.1, log.2,log.3,... hadoop fs -getmerge /aaa/log.* ./log.sum |
-put 功能:等同於copyFromLocal 示例:hadoop fs -put /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
|
-rm 功能:刪除文件或文件夾 示例:hadoop fs -rm -r /aaa/bbb/
-rmdir 功能:刪除空目錄 示例:hadoop fs -rmdir /aaa/bbb/ccc |
-df 功能:統計文件系統的可用空間信息 示例:hadoop fs -df -h /
-du 功能:統計文件夾的大小信息 示例: hadoop fs -du -s -h /aaa/*
|
-count 功能:統計一個指定目錄下的文件節點數量 示例:hadoop fs -count /aaa/
|
-setrep 功能:設置hdfs中文件的副本數量 示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz
|
三.Hdfs的工做機制
1.概述
(1)HDFS集羣分爲兩大角色:NameNode、DataNode
(2)NameNode負責管理整個文件系統的元數據
(3)DataNode 負責管理用戶的文件數據塊
(4)文件會按照固定的大小(blocksize)切成若干塊後分布式存儲在若干臺datanode上
(5)每個文件塊能夠有多個副本,並存放在不一樣的datanode上
(6)Datanode會按期向Namenode彙報自身所保存的文件block信息,而namenode則會負責保持文件的副本數量
(7)HDFS的內部工做機制對客戶端保持透明,客戶端請求訪問HDFS都是經過向namenode申請來進行
2.HDFS寫數據流程
(1)客戶端要向HDFS寫數據,首先要跟namenode通訊以確承認以寫文件並得到接收文件block的datanode,而後,客戶端按順序將文件逐個block傳遞給相應datanode,並由接收到block的datanode負責向其餘datanode複製block的副本
(2)詳細步驟解析
1.根namenode通訊請求上傳文件,namenode檢查目標文件是否已存在,父目錄是否存在
2.namenode返回是否能夠上傳
3.client請求第一個 block該傳輸到哪些datanode服務器上
4.namenode返回3個datanode服務器ABC
5.client請求3臺dn中的一臺A上傳數據(本質上是一個RPC調用,創建pipeline),A收到請求會繼續調用B,而後B調用C,將真個pipeline創建完成,逐級返回客戶端
6.client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet爲單位,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答
7.當一個block傳輸完成以後,client再次請求namenode上傳第二個block的服務器。
3.HDFS讀數據流程
(1)客戶端將要讀取的文件路徑發送給namenode,namenode獲取文件的元信息(主要是block的存放位置信息)返回給客戶端,客戶端根據返回的信息找到相應datanode逐個獲取文件的block並在客戶端本地進行數據追加合併從而得到整個文件
(2)詳細步驟解析
1.跟namenode通訊查詢元數據,找到文件塊所在的datanode服務器
2.挑選一臺datanode(就近原則,而後隨機)服務器,請求創建socket流
3.datanode開始發送數據(從磁盤裏面讀取數據放入流,以packet爲單位來作校驗)
4.客戶端以packet爲單位接收,如今本地緩存,而後寫入目標文件
四.Namenode工做機制
1.Namenode工做職責:負責客戶端請求的響應,元數據的管理(查詢,修改)
2.元數據管理 : namenode對數據的管理採用了三種存儲形式:內存元數據(NameSystem) 磁盤元數據鏡像文件數據操做日誌文件(可經過日誌運算出元數據)
3.元數據存儲機制:
A、內存中有一份完整的元數據(內存meta data)
B、磁盤有一個「準完整」的元數據鏡像(fsimage)文件(在namenode的工做目錄中)
C、用於銜接內存metadata和持久化元數據鏡像fsimage之間的操做日誌(edits文件)注:當客戶端對hdfs中的文件進行新增或者修改操做,操做記錄首先被記入edits日誌文件中,當客戶端操做成功後,相應的元數據會更新到內存meta.data中
4.元數據的checkpoint:每隔一段時間,會由secondary namenode將namenode上積累的全部edits和一個最新的fsimage下載到本地,並加載到內存進行merge(這個過程稱爲checkpoint)
dfs.namenode.checkpoint.check.period=60 #檢查觸發條件是否知足的頻率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上兩個參數作checkpoint操做時,secondary namenode的本地工做目錄
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
dfs.namenode.checkpoint.max-retries=3 #最大重試次數
dfs.namenode.checkpoint.period=3600 #兩次checkpoint之間的時間間隔3600秒
dfs.namenode.checkpoint.txns=1000000 #兩次checkpoint之間最大的操做記錄
checkpoint的附帶做用
namenode和secondary namenode的工做目錄存儲結構徹底相同,因此,當namenode故障退出須要從新恢復時,能夠從secondary namenode的工做目錄中將fsimage拷貝到namenode的工做目錄,以恢復namenode的元數據
五.Datanode工做機制
1.Datanode工做職責:存儲管理用戶的文件塊數據,按期向namenode彙報自身所持有的block信息(經過心跳信息上報)
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>3600000</value>
<description>Determines block reporting interval in milliseconds.</description>
</property>
2.Datanode掉線判斷時限參數
datanode進程死亡或者網絡故障形成datanode沒法與namenode通訊,namenode不會當即把該節點斷定爲死亡,要通過一段時間,這段時間暫稱做超時時長。HDFS默認的超時時長爲10分鐘+30秒。若是定義超時時間爲timeout,則超時時長的計算公式爲:
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。 而默認的heartbeat.recheck.interval 大小爲5分鐘,dfs.heartbeat.interval默認爲3秒。
須要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的單位爲毫秒,dfs.heartbeat.interval的單位爲秒。因此,舉個例子,若是heartbeat.recheck.interval設置爲5000(毫秒),dfs.heartbeat.interval設置爲3(秒,默認),則總的超 時時間爲40秒。
<property>
<name>heartbeat.recheck.interval</name>
<value>2000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>1</value>
</property>
六. Hdfs的java操做
1.搭建開發環境
(1)引入依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
(2)window下開發的說明 建議在linux下進行hadoop應用的開發,不會存在兼容性問題。如在window上作客戶端應用開發,須要設置如下環境:
A、在windows的某個目錄下解壓一個hadoop的安裝包
B、將安裝包下的lib和bin目錄用對應windows版本平臺編譯的本地庫替換
C、在window系統中配置HADOOP_HOME指向你解壓的安裝包
D、在windows系統的path變量中加入hadoop的bin目錄
2.HDFS客戶端操做數據代碼示例:
(1) 文件的增刪改查
public class HdfsClient {
FileSystem fs = null;
@Before public void init() throws Exception {
// 構造一個配置參數對象,設置一個參數:咱們要訪問的hdfs的URI // 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址 // new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml // 而後再加載classpath下的hdfs-site.xml Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hdp-node01:9000"); /** * 參數優先級: 一、客戶端代碼中設置的值 二、classpath下的用戶自定義配置文件 三、而後是服務器的默認配置 */ conf.set("dfs.replication", "3");
// 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例 // fs = FileSystem.get(conf);
// 若是這樣去獲取,那conf裏面就能夠不要配"fs.defaultFS"參數,並且,這個客戶端的身份標識已是hadoop用戶 fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
/** * 往hdfs上傳文件 * * @throws Exception */ @Test public void testAddFileToHdfs() throws Exception {
// 要上傳的文件所在的本地路徑 Path src = new Path("g:/redis-recommend.zip"); // 要上傳到hdfs的目標路徑 Path dst = new Path("/aaa"); fs.copyFromLocalFile(src, dst); fs.close(); }
/** * 從hdfs中複製文件到本地文件系統 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testDownloadFileToLocal() throws IllegalArgumentException, IOException { fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/")); fs.close(); }
@Test public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
// 建立目錄 fs.mkdirs(new Path("/a1/b1/c1"));
// 刪除文件夾 ,若是是非空文件夾,參數2必須給值true fs.delete(new Path("/aaa"), true);
// 重命名文件或文件夾 fs.rename(new Path("/a1"), new Path("/a2"));
}
/** * 查看目錄信息,只顯示文件 * * @throws IOException * @throws IllegalArgumentException * @throws FileNotFoundException */ @Test public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// 思考:爲何返回迭代器,而不是List之類的容器 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next(); System.out.println(fileStatus.getPath().getName()); System.out.println(fileStatus.getBlockSize()); System.out.println(fileStatus.getPermission()); System.out.println(fileStatus.getLen()); BlockLocation[] blockLocations = fileStatus.getBlockLocations(); for (BlockLocation bl : blockLocations) { System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset()); String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("--------------爲angelababy打印的分割線--------------"); } }
/** * 查看文件及文件夾信息 * * @throws IOException * @throws IllegalArgumentException * @throws FileNotFoundException */ @Test public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
String flag = "d-- "; for (FileStatus fstatus : listStatus) { if (fstatus.isFile()) flag = "f-- "; System.out.println(flag + fstatus.getPath().getName()); } } } |
(2)經過流的方式訪問hdfs
/** * 相對那些封裝好的方法而言的更底層一些的操做方式 * 上層那些mapreduce spark等運算框架,去hdfs中獲取數據的時候,就是調的這種底層的api * @author * */ public class StreamAccess {
FileSystem fs = null;
@Before public void init() throws Exception {
Configuration conf = new Configuration(); fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
@Test public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
//先獲取一個文件的輸入流----針對hdfs上的 FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
//再構造一個文件的輸出流----針對本地的 FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
//再將輸入流中數據傳輸到輸出流 IOUtils.copyBytes(in, out, 4096);
}
/** * hdfs支持隨機定位進行文件讀取,並且能夠方便地讀取指定長度 * 用於上層分佈式運算框架併發處理數據 * @throws IllegalArgumentException * @throws IOException */ @Test public void testRandomAccess() throws IllegalArgumentException, IOException{ //先獲取一個文件的輸入流----針對hdfs上的 FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
//能夠將流的起始偏移量進行自定義 in.seek(22);
//再構造一個文件的輸出流----針對本地的 FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
IOUtils.copyBytes(in,out,19L,true);
}
/** * 顯示hdfs上文件的內容 * @throws IOException * @throws IllegalArgumentException */ @Test public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
IOUtils.copyBytes(in, System.out, 1024); } } |