HDFS基本概念java
一、HDFS設計思想node
分而治之:將大文件、大批量文件,分佈式存放在大量服務器上,以便於採起分而治之的方式對海量數據進行運算分析linux
二、概念和特性web
概念:HDFS是一個分佈式的文件系統。shell
特性:apache
(1)HDFS中的文件在物理上是分塊存儲(block),塊的大小能夠經過配置參數( dfs.blocksize)來規定,默認大小在hadoop2.x版本中是128M,老版本中是64M編程
(2)HDFS文件系統會給客戶端提供一個統一的抽象目錄樹,客戶端經過路徑來訪問文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.datawindows
(3)目錄結構及文件分塊信息(元數據)的管理由namenode節點承擔api
——namenode是HDFS集羣主節點,負責維護整個hdfs文件系統的目錄樹,以及每個路徑(文件)所對應的block塊信息(block的id,及所在的datanode服務器)緩存
(4)文件的各個block的存儲管理由datanode節點承擔
---- datanode是HDFS集羣從節點,每個block均可以在多個datanode上存儲多個副本(副本數量也能夠經過參數設置dfs.replication)
(5)HDFS是設計成適應一次寫入,屢次讀出的場景,且不支持文件的修改
(注:適合用來作數據分析,並不適合用來作網盤應用,由於,不便修改,延遲大,網絡開銷大,成本過高)
HDFS基本操做(shell操做)
一、HDFS命令行客戶端使用
二、命令行客戶端支持的命令參數
三、經常使用命令參數介紹
-help 功能:輸出這個命令參數手冊 hadoop fs -help |
-ls 功能:顯示目錄信息 示例: hadoop fs -ls hdfs://hadoop1:9000/ 備註:這些參數中,全部的hdfs路徑均可以簡寫 -->hadoop fs -ls / 等同於上一條命令的效果(hadoop fs -ls -R / 會列出全部嵌套文件) |
-mkdir 功能:在hdfs上建立目錄 示例:hadoop fs -mkdir -p /aaa/bbb/cc/dd |
-moveFromLocal 功能:從本地剪切粘貼到hdfs 示例:hadoop fs - moveFromLocal /home/hadoop/a.txt /aaa/bbb/cc/dd -moveToLocal 功能:從hdfs剪切粘貼到本地 示例:hadoop fs - moveToLocal /aaa/bbb/cc/dd /home/hadoop/a.txt |
--appendToFile 功能:追加一個文件到已經存在的文件末尾 示例:hadoop fs -appendToFile ./hello.txt hdfs://hadoop-server01:9000/hello.txt 能夠簡寫爲: hadoop fs -appendToFile ./hello.txt /hello.txt
|
-cat 功能:顯示文件內容 示例:hadoop fs -cat /hello.txt
-tail 功能:顯示一個文件的末尾 示例:hadoop fs -tail /weblog/access_log.1 -text 功能:以字符形式打印一個文件的內容 示例:hadoop fs -text /weblog/access_log.1 |
-chgrp -chmod -chown 功能:linux文件系統中的用法同樣,對文件所屬權限 示例: hadoop fs -chmod 666 /hello.txt hadoop fs -chown someuser:somegrp /hello.txt |
-copyFromLocal 功能:從本地文件系統中拷貝文件到hdfs路徑去 示例:hadoop fs -copyFromLocal ./jdk.tar.gz /aaa/ -copyToLocal 功能:從hdfs拷貝到本地 示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz / |
-cp 功能:從hdfs的一個路徑拷貝hdfs的另外一個路徑 示例: hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
-mv 功能:在hdfs目錄中移動文件 示例: hadoop fs -mv /aaa/jdk.tar.gz / |
-get 功能:等同於copyToLocal,就是從hdfs下載文件到本地 示例:hadoop fs -get /aaa/jdk.tar.gz / -getmerge 功能:合併下載多個文件 示例:好比hdfs的目錄 /aaa/下有多個文件:log.1, log.2,log.3,... hadoop fs -getmerge /aaa/log.* ./log.sum |
-put 功能:等同於copyFromLocal 示例:hadoop fs -put /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
|
-rm 功能:刪除文件或文件夾 示例:hadoop fs -rm -r /aaa/bbb/
-rmdir 功能:刪除空目錄 示例:hadoop fs -rmdir /aaa/bbb/ccc |
-df 功能:統計文件系統的可用空間信息 示例:hadoop fs -df -h /
-du 功能:統計文件夾的大小信息 示例: hadoop fs -du -s -h /aaa/*
|
-count 功能:統計一個指定目錄下的文件節點數量 示例:hadoop fs -count /aaa/
|
-setrep 功能:設置hdfs中文件的副本數量 示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz <這裏設置的副本數只是記錄在namenode的元數據中,是否真的會有這麼多副本,還得看datanode的數量>
|
HDFS原理
一、HDFS的工做機制
1.1概述
1.2HDFS寫數據流程
1.2.1概述
客戶端要向HDFS寫數據,首先要跟namenode通訊以確承認以寫文件並得到接收文件block的datanode,而後,客戶端按順序將文件逐個block傳遞給相應datanode,並由接收到block的datanode負責向其餘datanode複製block的副本
1.2.2詳細步驟圖(上傳文件)
1.2.3詳細步驟解析
1、client跟namenode通訊,請求上傳文件,namenode檢查目錄樹中目標文件是否已存在?父目錄是否存在?
2、namenode返回是否能夠上傳
3、client請求第一個block該傳輸到哪些datanode服務器上
4、namenode查詢DataNode信息,而後返回3個可用的datanode服務器ABC給client
5、client請求3臺DataNode中的一臺A上傳數據(本質上是一個RPC調用,創建pipeline),A收到請求會繼續調用B,而後B調用C,將整個pipeline創建完成,逐級返回客戶端
6、client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet爲單位,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答
7、當一個block傳輸完成以後,client再次請求namenode上傳第二個block的服務器。
1.3HDFS讀數據流程
1.3.1概述
客戶端將要讀取的文件路徑發送給namenode,namenode獲取文件的元信息(主要是block的存放位置信息)返回給客戶端,客戶端根據返回的信息找到相應datanode逐個獲取文件的block並在客戶端本地進行數據追加合併從而得到整個文件
1.3.2詳細步驟圖(下載文件)
1.3.3詳細步驟解析
1、client跟namenode通訊請求讀取文件,namenode查詢元數據,找到文件塊所在的datanode服務器,而後將每一個文件塊所在的DataNode服務器返回給client
2、挑選一臺datanode(就近原則,而後隨機)服務器,請求創建socket流
3、datanode開始發送數據(從磁盤裏面讀取數據放入流,以packet爲單位來作校驗)
4、客戶端以packet爲單位接收,先在本地緩存,而後寫入目標文件
二、NameNode工做機制
2.1namenode職責
負責客戶端青請求的響應
元數據的管理(查詢、修改)
2.2元數據管理
namenode對數據的管理採用了三種存儲形式:
內存元數據(NameSystem)
磁盤元數據鏡像文件
數據操做日誌文件(可經過日誌運算出元數據)
2.2.1元數據存儲機制
A、內存中有一份完整的元數據(內存meta data)
B、磁盤有一個「準完整」的元數據鏡像(fsimage)文件(在namenode的工做目錄中)
C、用於銜接內存metadata和持久化元數據鏡像fsimage之間的操做日誌(edits文件)
注:當客戶端對hdfs中的文件進行新增或者修改操做,操做記錄首先被記入edits日誌文件中,當客戶端操做成功後,相應的元數據會更新到內存meta.data中
2.2.2元數據手動查看
能夠經過hdfs的一個工具來查看edits中的信息
bin/hdfs oev -i edits -o edits.xml
bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
2.2.3元數據的checkpoint
每隔一段時間,會由secondary namenode將namenode上積累的全部edits和一個最新的fsimage下載到本地,並加載到內存進行merge(這個過程稱爲checkpoint)
dfs.namenode.checkpoint.check.period=60 #檢查觸發條件是否知足的頻率,60秒 dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary #以上兩個參數作checkpoint操做時,secondary namenode的本地工做目錄 dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
dfs.namenode.checkpoint.max-retries=3 #最大重試次數 dfs.namenode.checkpoint.period=3600 #兩次checkpoint之間的時間間隔3600秒 dfs.namenode.checkpoint.txns=1000000 #兩次checkpoint之間最大的操做記錄 |
namenode和secondary namenode的工做目錄存儲結構徹底相同,因此,當namenode故障退出須要從新恢復時,能夠從secondary namenode的工做目錄中將fsimage拷貝到namenode的工做目錄,以恢復namenode的元數據
三、DataNode工做機制
1、Datanode工做職責:
存儲管理用戶的文件塊數據
按期向namenode彙報自身所持有的block信息(經過心跳信息上報)
(這點很重要,由於,當集羣中發生某些block副本失效時,集羣如何恢復block初始副本數量的問題)
<property> <name>dfs.blockreport.intervalMsec</name> <value>3600000</value> <description>Determines block reporting interval in milliseconds.</description> </property> |
2、Datanode掉線判斷時限參數
datanode進程死亡或者網絡故障形成datanode沒法與namenode通訊,namenode不會當即把該節點斷定爲死亡,要通過一段時間,這段時間暫稱做超時時長。HDFS默認的超時時長爲10分鐘+30秒。若是定義超時時間爲timeout,則超時時長的計算公式爲:
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
而默認的heartbeat.recheck.interval 大小爲5分鐘,dfs.heartbeat.interval默認爲3秒。
須要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的單位爲毫秒,dfs.heartbeat.interval的單位爲秒。因此,舉個例子,若是heartbeat.recheck.interval設置爲5000(毫秒),dfs.heartbeat.interval設置爲3(秒,默認),則總的超時時間爲40秒。
<property> <name>heartbeat.recheck.interval</name> <value>2000</value> </property> <property> <name>dfs.heartbeat.interval</name> <value>1</value> </property> |
上傳一個文件,觀察文件的block具體的物理存放狀況:
在每一臺datanode機器上的這個目錄中能找到文件的切塊:
/usr/local/hadoop/tmp/dfs/data/current/BP-193442119-192.168.2.120-1432457733977/current/finalized
HDFS應用開發(Java操做)
一、搭建開發環境
1.1引入依賴
手動引入jar包,hdfs的jar包位於hadoop安裝目錄的share文件夾下。
建立一個hdfsjar用戶類庫,而後將common和hdfs兩個文件夾中的jar包所有添加進去。
1.2Windows下開發的說明
建議在Linux下進行hadoop應用的開發,這樣不會存在兼容性問題。
若是在Windows上作客戶端應用開發,須要設置一下環境:
A、在windows的某個目錄下解壓一個hadoop的安裝 d:\hadoop-2.6.4
B、將安裝包下的lib和bin目錄用對應windows版本平臺編譯的本地庫替換
C、在window系統中配置HADOOP_HOME指向你解壓的安裝包
HADOOP_HOME=d:\hadoop-2.6.4
D、在windows系統的path變量中加入hadoop的bin目錄
PATH=d:\hadoop-2.6.4\bin
二、獲取API中的客戶端對象
在java中操做hdfs,首先要得到一個客戶端實例
Configuration conf = new Configuration() FileSystem fs = FileSystem.get(conf) |
而咱們的操做目標是HDFS,因此獲取到的fs對象應該是DistributedFileSystem的實例;
get方法是從何處判斷具體實例化那種客戶端類呢?
——從conf中的一個參數 fs.defaultFS的配置值判斷;
若是咱們的代碼中沒有指定fs.defaultFS,而且工程classpath下也沒有給定相應的配置,conf中的默認值就來自於hadoop的jar包中的core-default.xml,默認值爲: file:///,則獲取的將不是一個DistributedFileSystem的實例,而是一個本地文件系統的客戶端對象
三、DistributedFileSystem實例對象所具有的方法
四、hdfs客戶端操做數據代碼
4.1文件的增刪改查
1 package com.ahu.bigdata.javaclient; 2 3 import java.net.URI; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.BlockLocation; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.LocatedFileStatus; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.fs.RemoteIterator; 11 12 /** 13 * 使用hdfsJava客戶端實現文件的增刪改查 14 * 15 * @author ahu_lichang 16 * 17 */ 18 public class HdfsClientDemo { 19 static FileSystem fs = null; 20 21 public static void init() throws Exception { 22 // 構造一個配置參數對象,設置一個參數:咱們要訪問的hdfs的URI 23 // 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址 24 // new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml 25 // 而後再加載classpath下的hdfs-site.xml 26 Configuration configuration = new Configuration(); 27 // configuration.set("fs.defaultFS", "hdfs://hadoop1:9000"); 28 /* 29 * 參數優先級:一、客戶端代碼中設置的值 二、classpath下的用戶自定義配置文件 三、服務器的默認設置 30 */ 31 // configuration.set("dfs.replication", "3"); 32 // 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例 33 // fs=FileSystem.get(configuration); 34 // 若是這樣去獲取,那configuration裏面就能夠不要配"fs.defaultFS"參數,並且,這個客戶端的身份標識已是root用戶 35 fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, 36 "root"); 37 } 38 39 /** 40 * 向hdfs上傳文件 41 * 42 * @throws Exception 43 */ 44 public static void testAddFileToHdfs() throws Exception { 45 // 本地路徑 46 Path src = new Path("E:/access.log"); 47 // 目標路徑 48 Path dst = new Path("/access.log.copy"); 49 fs.copyFromLocalFile(src, dst); 50 fs.close(); 51 } 52 53 /** 54 * 從hdfs中複製文件到本地文件系統 55 * 56 * @throws Exception 57 */ 58 public static void testDownloadFileToLocal() throws Exception { 59 fs.copyToLocalFile(new Path("/access.log.copy"), new Path("E:/")); 60 fs.close(); 61 } 62 63 /** 64 * 在hdfs中建立目錄、刪除文件夾、重命名文件或文件夾 65 * 66 * @throws Exception 67 */ 68 public static void testMkdirAndDeleteAndRename() throws Exception { 69 // 建立目錄 70 fs.mkdirs(new Path("/a1/b1/c1")); 71 // 刪除文件夾,若是是非空文件夾,參數2必須給值true 72 fs.delete(new Path("/access.log.copy"), true); 73 // 重命名文件或文件夾 74 fs.rename(new Path("/a1"), new Path("/a2")); 75 } 76 77 /** 78 * 查看目錄信息,只顯示文件 79 * 80 * @throws Exception 81 */ 82 public static void testListFiles() throws Exception { 83 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles( 84 new Path("/"), true); 85 while (listFiles.hasNext()) { 86 LocatedFileStatus fileStatus = listFiles.next(); 87 System.out.println(fileStatus.getPath().getName());// job 88 System.out.println(fileStatus.getBlockSize());// 塊大小(都同樣大) 89 System.out.println(fileStatus.getPermission());// 權限 90 System.out.println(fileStatus.getLen());// 塊大小 91 BlockLocation[] blockLocations = fileStatus.getBlockLocations(); 92 for (BlockLocation blockLocation : blockLocations) { 93 System.out.println("block-length:" + blockLocation.getLength() 94 + "---" + "block-offset" + blockLocation.getOffset());// 塊長度、塊的起始偏移量 95 String[] hosts = blockLocation.getHosts(); 96 for (String host : hosts) {// 塊分佈在哪些主機上 97 System.out.println(host); 98 } 99 } 100 System.out 101 .println("----------------------------------------------------------"); 102 } 103 } 104 105 /** 106 * 查看文件及文件夾信息(跟hadoop fs -ls /查詢的效果差很少) 107 * 108 * @throws Exception 109 */ 110 public static void testListAll() throws Exception { 111 FileStatus[] listStatus = fs.listStatus(new Path("/")); 112 String flag = "d-- "; 113 for (FileStatus fileStatus : listStatus) { 114 if (fileStatus.isFile()) 115 flag = "f-- "; 116 System.out.println(flag + fileStatus.getPath().getName()); 117 } 118 } 119 120 public static void main(String[] args) throws Exception { 121 init(); 122 // testAddFileToHdfs(); 123 // testDownloadFileToLocal(); 124 // testMkdirAndDeleteAndRename(); 125 // testListFiles(); 126 testListAll(); 127 } 128 }
4.2經過流的方式訪問hdfs和場景編程
1 package com.ahu.bigdata.javaclient; 2 3 import java.io.File; 4 import java.io.FileOutputStream; 5 import java.io.IOException; 6 import java.net.URI; 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.BlockLocation; 9 import org.apache.hadoop.fs.FSDataInputStream; 10 import org.apache.hadoop.fs.FileStatus; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.IOUtils; 14 15 /** 16 * 經過流的方式訪問hdfs 17 * 18 * 相對於那些封裝好的方法而言的,是更底層的一些操做方式 19 * 20 * 上層的那些mapreduce、spark等運算框架,去hdfs中獲取數據的時候,就是調用這種底層的api 21 * 22 * @author ahu_lichang 23 * 24 */ 25 public class StreamAccess { 26 static FileSystem fs = null; 27 28 public static void init() throws Exception { 29 Configuration configuration = new Configuration(); 30 fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, 31 "root"); 32 } 33 34 /** 35 * 將hdfs中的文件下載到本地 36 * 37 * @throws Exception 38 */ 39 public static void testDownloadFileToLocal() throws Exception { 40 // 先獲取一個文件的輸入流---針對hdfs 41 FSDataInputStream inputStream = fs.open(new Path( 42 "/wordcount/output/part-r-00000")); 43 // 再構造一個文件的輸出流---針對本地 44 FileOutputStream outputStream = new FileOutputStream(new File( 45 "E:/part-r-00000")); 46 // 將輸入流中數據傳輸到輸出流中 47 IOUtils.copyBytes(inputStream, outputStream, 4096);// 4096緩衝區大小 48 } 49 50 /** 51 * hdfs支持隨機定位進行文件讀取,並且能夠方便地讀取指定長度 52 * 53 * 用於上層分佈式運算框架併發處理數據 54 * 55 * @throws Exception 56 */ 57 public static void testRandomAccess() throws Exception { 58 // 先獲取一個文件的輸入流---針對hdfs 59 FSDataInputStream inputStream = fs.open(new Path( 60 "/wordcount/output/part-r-00000")); 61 // 能夠將輸入流的起始偏移量進行自定義 62 inputStream.seek(200); 63 // 再構造一個文件的輸出流---針對本地 64 FileOutputStream outputStream = new FileOutputStream(new File( 65 "E:/part-r-custom")); 66 IOUtils.copyBytes(inputStream, outputStream, 4096L, true);// true表示傳輸完畢關閉流 67 } 68 69 /** 70 * 顯示hdfs文件上的內容 71 * 72 * @throws Exception 73 */ 74 public static void testCat() throws Exception { 75 FSDataInputStream inputStream = fs.open(new Path( 76 "/wordcount/output/part-r-00000")); 77 IOUtils.copyBytes(inputStream, System.out, 1024); 78 } 79 80 /** 81 * 場景編程:獲取一個文件的全部block位置信息,而後讀取指定block中的內容 82 * 83 * @throws IOException 84 * @throws IllegalArgumentException 85 */ 86 public static void testBlockCat() throws Exception { 87 FSDataInputStream inputStream = fs.open(new Path( 88 "/wordcount/output/part-r-00000")); 89 // 拿到文件信息 90 FileStatus[] listStatus = fs.listStatus(new Path( 91 "/wordcount/output/part-r-00000")); 92 // 獲取這個文件的全部block的信息 93 BlockLocation[] fileBlockLocations = fs.getFileBlockLocations( 94 listStatus[0], 0L, listStatus[0].getLen()); 95 // 第一個block的長度 96 long length = fileBlockLocations[0].getLength(); 97 // 第一個block的起始偏移量 98 long offset = fileBlockLocations[0].getOffset(); 99 System.out.println(length); 100 System.out.println(offset); 101 // 獲取第一個block寫入輸出流 102 byte[] b = new byte[4096]; 103 FileOutputStream fos = new FileOutputStream(new File("E:/block0")); 104 while (inputStream.read(offset, b, 0, 4096) != -1) { 105 fos.write(b); 106 offset += 4096; 107 if (offset >= length) 108 return; 109 } 110 fos.flush(); 111 fos.close(); 112 inputStream.close(); 113 } 114 115 public static void main(String[] args) throws Exception { 116 init(); 117 // testDownloadFileToLocal(); 118 // testRandomAccess(); 119 // testCat(); 120 testBlockCat(); 121 } 122 123 }