hadoop之HDFS學習筆記(一)

主要內容:hdfs的總體運行機制,DATANODE存儲文件塊的觀察,hdfs集羣的搭建與配置,hdfs命令行客戶端常見命令;業務系統中日誌生成機制,HDFS的java客戶端api基本使用。java

一、什麼是大數據

基本概念

《數據處理》node

在互聯網技術發展到現今階段,大量平常、工做等事務產生的數據都已經信息化,人類產生的數據量相比之前有了爆炸式的增加,之前的傳統的數據處理技術已經沒法勝任,需求催生技術,一套用來處理海量數據的軟件工具應運而生,這就是大數據!linux

 

處理海量數據的核心技術:web

海量數據存儲:分佈式算法

海量數據運算:分佈式sql

大數據的海量數據的存儲和運算,核心技術就是分佈式。數據庫

 

這些核心技術的實現是不須要用戶從零開始造輪子的express

存儲和運算,都已經有大量的成熟的框架來用apache

 

存儲框架:編程

HDFS——分佈式文件存儲系統(HADOOP中的存儲框架)

HBASE——分佈式數據庫系統

KAFKA——分佈式消息緩存系統(實時流式數據處理場景中應用普遍)

 

文件系統中的數據以非結構化居多,沒有直觀的結構,數據庫中的信息多以表的形式存在,具備結構化,存在規律;

查詢的時候文本文件只能一行一行掃描,而數據庫效率高不少,能夠利用sql查詢語法,數據庫在存和取方便的多。

數據庫和文件系統相比,數據庫至關於在特定的文件系統上的軟件封裝。其實HBASE就是對HDFS的進一層封裝,它的底層文件系統就是HDFS。

分佈式消息緩存系統,既然是分佈式,那就意味着橫跨不少機器,意味着容量能夠很大。和前二者相比它的數據存儲形式是消息(不是表,也不是文件),消息能夠看作有固定格式的一條數據,好比消息頭,消息體等,消息體能夠是json,數據庫的一條記錄,一個序列化對象等。消息最終存放在kafaka內部的特定的文件系統裏。

 

運算框架:(要解決的核心問題就是幫用戶將處理邏輯在不少機器上並行

MAPREDUCE—— 離線批處理/HADOOP中的運算框架

SPARK —— 離線批處理/實時流式計算

STORM —— 實時流式計算

 

離線批處理:數據是靜態的,一次處理一大批數據。

實時流式:數據在源源不斷的生成,邊生成,邊計算

這些運算框架的思想都差很少,特別是mapreduce和spark,簡單來看spark是對mapreduce的進一步封裝;

運算框架和存儲框架之間沒有強耦合關係,spark能夠讀HDFS,HBASE,KAFKA裏的數據,固然須要存儲框架提供訪問接口。

 

輔助類的工具(解放大數據工程師的一些繁瑣工做):

HIVE —— 數據倉庫工具:能夠接收sql,翻譯成mapreduce或者spark程序運行

FLUME——數據採集

SQOOP——數據遷移

ELASTIC SEARCH —— 分佈式的搜索引擎

flume用於自動採集數據源機器上的數據到大數據集羣中。

HIVE看起來像一個數據庫,但其實不是,Hive中存了一些須要分析的數據,而後在直接寫sql進行分析,hive接收sql,翻譯成mapreduce或者spark程序運行;

hive本質是mapreduce或spark,咱們只須要寫sql邏輯而不是mapreduce邏輯,Hive自動完成對sql的翻譯,並且仍是在海量數據集上。

.......

 

換個角度說,大數據是:

一、有海量的數據

二、有對海量數據進行挖掘的需求

三、有對海量數據進行挖掘的軟件工具(hadoop、spark、storm、flink、tez、impala......)

大數據在現實生活中的具體應用

數據處理的最典型應用:公司的產品運營狀況分析

 

電商推薦系統:基於海量的瀏覽行爲、購物行爲數據,進行大量的算法模型的運算,得出各種推薦結論,以供電商網站頁面來爲用戶進行商品推薦

 

精準廣告推送系統:基於海量的互聯網用戶的各種數據,統計分析,進行用戶畫像(獲得用戶的各類屬性標籤),而後能夠爲廣告主進行有針對性的精準的廣告投放

二、什麼是hadoop

hadoop中有3個核心組件:

分佈式文件系統:HDFS —— 實現將文件分佈式存儲在不少的服務器上

分佈式運算編程框架:MAPREDUCE —— 實如今不少機器上分佈式並行運算

分佈式資源調度平臺:YARN —— 幫用戶調度大量的mapreduce程序,併合理分配運算資源

三、hdfs總體運行機制

hdfs:分佈式文件系統

hdfs有着文件系統共同的特徵:

一、有目錄結構,頂層目錄是:  /

二、系統中存放的就是文件

三、系統能夠提供對文件的:建立、刪除、修改、查看、移動等功能

 

hdfs跟普通的單機文件系統有區別:

一、單機文件系統中存放的文件,是在一臺機器的操做系統中

二、hdfs的文件系統會橫跨N多的機器

三、單機文件系統中存放的文件,是在一臺機器的磁盤上

四、hdfs文件系統中存放的文件,是落在n多機器的本地單機文件系統中(hdfs是一個基於linux本地文件系統之上的文件系統)

 

hdfs的工做機制:

一、客戶把一個文件存入hdfs,其實hdfs會把這個文件切塊後,分散存儲在N臺linux機器系統中(負責存儲文件塊的角色:data node)<準確來講:切塊的行爲是由客戶端決定的>

二、一旦文件被切塊存儲,那麼,hdfs中就必須有一個機制,來記錄用戶的每個文件的切塊信息,及每一塊的具體存儲機器(負責記錄塊信息的角色是:name node)

三、爲了保證數據的安全性,hdfs能夠將每個文件塊在集羣中存放多個副本(到底存幾個副本,是由當時存入該文件的客戶端指定的)

 

綜述:一個hdfs系統,由一臺運行了namenode的服務器,和N臺運行了datanode的服務器組成!

四、搭建hdfs分佈式集羣

4.1 hdfs集羣組成結構:

4.2 安裝hdfs集羣的具體步驟:

4.2.一、首先須要準備N臺linux服務器

學習階段,用虛擬機便可!

先準備4臺虛擬機:1個namenode節點  + 3 個datanode 節點

 

4.2.二、修改各臺機器的主機名和ip地址

主機名:hdp-01  對應的ip地址:192.168.33.11

主機名:hdp-02  對應的ip地址:192.168.33.12

主機名:hdp-03  對應的ip地址:192.168.33.13

主機名:hdp-04  對應的ip地址:192.168.33.14

4.2.三、從windows中用CRT軟件進行遠程鏈接

在windows中將各臺linux機器的主機名配置到的windows的本地域名映射文件中:

c:/windows/system32/drivers/etc/hosts

192.168.33.11 hdp-01

192.168.33.12 hdp-02

192.168.33.13 hdp-03

192.168.33.14 hdp-04

用crt鏈接上後,修改一下crt的顯示配置(字號,編碼集改成UTF-8):

 

 

 

4.2.三、配置linux服務器的基礎軟件環境

 

  •  防火牆

關閉防火牆:service iptables stop 

關閉防火牆自啓: chkconfig iptables off

 

  •  安裝jdk:(hadoop體系中的各軟件都是java開發的)

  1)        利用alt+p 打開sftp窗口,而後將jdk壓縮包拖入sftp窗口

  2)         而後在linux中將jdk壓縮包解壓到/root/apps 下

  3)         配置環境變量:JAVA_HOME   PATH

  vi /etc/profile   在文件的最後,加入:

export JAVA_HOME=/root/apps/jdk1.8.0_60 export PATH=$PATH:$JAVA_HOME/bin

  4)         修改完成後,記得 source /etc/profile使配置生效

  5)         檢驗:在任意目錄下輸入命令: java -version 看是否成功執行

  6)         將安裝好的jdk目錄用scp命令拷貝到其餘機器

  7)         將/etc/profile配置文件也用scp命令拷貝到其餘機器並分別執行source命令

  •   集羣內主機的域名映射配置

在hdp-01上,vi /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.33.11   hdp-01

192.168.33.12   hdp-02

192.168.33.13   hdp-03

192.168.33.14   hdp-04

而後,將hosts文件拷貝到集羣中的全部其餘機器上

scp /etc/hosts hdp-02:/etc/

scp /etc/hosts hdp-03:/etc/

scp /etc/hosts hdp-04:/etc/

 

提示:

若是在執行scp命令的時候,提示沒有scp命令,則能夠配置一個本地yum源來安裝

一、先在虛擬機中配置cdrom爲一個centos的安裝鏡像iso文件

二、在linux系統中將光驅掛在到文件系統中(某個目錄)

三、mkdir /mnt/cdrom

四、mount -t iso9660 -o loop /dev/cdrom /mnt/cdrom

五、檢驗掛載是否成功: ls /mnt/cdrom

六、三、配置yum的倉庫地址配置文件

七、yum的倉庫地址配置文件目錄: /etc/yum.repos.d

八、先將自帶的倉庫地址配置文件批量改名:

 

九、而後,拷貝一個出來進行修改

 

 

十、修改完配置文件後,再安裝scp命令:

十一、yum install openssh-clients -y

4.2.四、安裝hdfs集羣

 

一、上傳hadoop安裝包到hdp-01

bin文件爲hadoop功能命令,sbin中爲集羣管理命令。

二、修改配置文件

 

核心配置參數:

1)         指定hadoop的默認文件系統爲:hdfs

2)         指定hdfs的namenode節點爲哪臺機器

3)         指定namenode軟件存儲元數據的本地目錄

4)         指定datanode軟件存放文件塊的本地目錄

hadoop的配置文件在:/root/apps/hadoop安裝目錄/etc/hadoop/

hadoop中的其餘組件如mapreduce,yarn等,這些組將會去讀數據,指定hadoop的默認文件系統爲:hdfs,就是告訴這些組件去hdfs中讀數據;該項配置意味dadoop中的組件能夠訪問各類文件系統。

若不指定數據的存放目錄,hadoop默認將數據存放在/temp下。

能夠參考官網的默認配置信息。

1) 修改hadoop-env.sh
export JAVA_HOME=/root/apps/jdk1.8.0_60
2) 修改core-site.xml
<configuration>

<property>

<name>fs.defaultFS</name>

<value>hdfs://hdp-01:9000/</value>

</property>

</configuration>

 

<value>hdfs://hdp-01:9000</value>包含兩層意思:

  一、指定默認的文件系統。

  二、指明瞭namenode是誰。

value中的值是URI風格

3) 修改hdfs-site.xml

配置namenode和datanode的工做目錄,添加secondary name node。

<configuration>

<property>

<name>dfs.namenode.name.dir</name>

<value>/root/hdpdata/name/</value>

</property>

 

<property>

<name>dfs.datanode.data.dir</name>

<value>/root/hdpdata/data</value>

</property>

 

<property>

<name>dfs.namenode.secondary.http-address</name>

<value>hdp-02:50090</value>

</property>

 

</configuration>

 

4) 拷貝整個hadoop安裝目錄到其餘機器 
  scp -r /root/apps/hadoop-2.8.1  hdp-02:/root/apps/

  scp -r /root/apps/hadoop-2.8.1  hdp-03:/root/apps/

  scp -r /root/apps/hadoop-2.8.1  hdp-04:/root/apps/

 

5) 啓動HDFS

 

所謂的啓動HDFS,就是在對的機器上啓動對的軟件

提示:

要運行hadoop的命令,須要在linux環境中配置HADOOP_HOME和PATH環境變量

vi /etc/profile

 


export JAVA_HOME=/root/apps/jdk1.8.0_60 export HADOOP_HOME=/root/apps/hadoop-2.8.1 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

 

 

首先,初始化namenode的元數據目錄

要在hdp-01上執行hadoop的一個命令來初始化namenode的元數據存儲目錄

hadoop namenode -format

  建立一個全新的元數據存儲目錄

  生成記錄元數據的文件fsimage

  生成集羣的相關標識:如:集羣id——clusterID

該步驟叫作namenode的初始化也叫格式化,本質是創建namenode運行所須要的目錄以及一些必要的文件,因此該操做通常只在集羣第一次啓動以前執行。

 

 

而後,啓動namenode進程(在hdp-01上)

hadoop-daemon.sh start namenode

啓動完後,首先用jps查看一下namenode的進程是否存在

namenode就是一個java軟件,咱們知道啓動一個java軟件須要主類的main方法 java xxx.java - 若干參數,處於方便的考慮,hadoop中提供了一個通用的軟件啓動腳本hadoop-daemon.sh,腳本能夠接受參數,專門用來啓動hadoop中的軟件。

 

能夠看到namenode在監聽兩個端口,9000用來和客戶端通訊(9000爲RPC端口號,內部進程之間互相通訊的端口,datanode和namenode的通訊),接受hdfs客戶端的請求,50070是web服務端口,也就是說namenode內置一個web服務器,http客戶端能夠經過次端口發送請求。

而後,在windows中用瀏覽器訪問namenode提供的web端口:50070

http://hdp-01:50070

而後,啓動衆datanode們(在任意地方)

hadoop-daemon.sh start datanode

 

下圖是datanode的一下信息展現,能夠看到datanode內部通訊的端口號是50010,並且datanode也提供了問訪問端口50075.

6) 用自動批量啓動腳原本啓動HDFS

hdfs其實就是一堆java軟件,咱們能夠本身手動hadoop-daemon.sh逐個啓動,也可使用hadoop提供的批量啓動腳本。

1)         先配置hdp-01到集羣中全部機器(包含本身)的免密登錄

 

2)         配完免密後,能夠執行一次  ssh 0.0.0.0

3)         修改hadoop安裝目錄中/etc/hadoop/slaves(把須要啓動datanode進程的節點列入)

hdp-01

hdp-02

hdp-03

hdp-04

core-site.xml中配置過namenode,可是須要批量啓動那些datanode呢,該文件/etc/hadoop/slaves的配置就是解決這個問題的,該文件就是給啓動腳本看的。

4)         在hdp-01上用腳本:start-dfs.sh 來自動啓動整個集羣

5)         若是要中止,則用腳本:stop-dfs.sh

start-dfs.sh、stop-dfs.sh會啓動、關閉namenode,datanode和secondnamenode

固然你也能夠本身寫腳原本作上述的事情 ,以下所示。

 

 

五、hdfs的客戶端操做

hdfs裝好以後,接下來的工做就是hdfs裏傳東西,去東西,由客戶端來完成。

5.一、客戶端的理解

hdfs的客戶端有多種形式:

一、網頁形式

二、命令行形式

三、客戶端在哪裏運行,沒有約束,只要運行客戶端的機器可以跟hdfs集羣聯網們

   對於客戶端來說,hdfs是一個總體,網頁版的客戶端主要是用來查看hdfs信息的,能夠建立目錄,可是須要權限

命令行客戶端 

bin命令中的 hadoophdfs  均可以啓動 hdfs 客戶端,hadoop和hdfs都是腳本,都會去啓動一個hdfs的java客戶端。java客戶端在安裝包的jar包中

./hadoop fs -ls /

 

表示hadoop要訪問hdfs,該腳本就會去啓動hdfs客戶端,客戶端能夠接收參數,好比查看hdfs根目錄。

 

 

文件的切塊大小和存儲的副本數量,都是由客戶端決定!

所謂的由客戶端決定,是經過配置參數來定的

hdfs的客戶端會讀如下兩個參數,來決定切塊大小(默認128M)、副本數量(默認3):

切塊大小的參數: dfs.blocksize

副本數量的參數: dfs.replication

若是使用命令行客戶端時,上面兩個參數應該配置在客戶端機器的hadoop目錄中的hdfs-site.xml中配置,(命令行客戶端本質就是啓動了一個java客戶端,這個客戶端在啓動的時候會將它依賴的全部jar包加入classpath中,客戶端會從jar包中,加載xx-default.xml來得到默認的配置文件,也能夠在hadoop/etc/xxx-site.xml中配置具體的參數來覆蓋默認值。此時的/etc下的配置文件就是客戶自定義的配置文件,也會被java客戶端加載【客戶端能夠運行在任何地方】);

固然也能夠在具體代碼中指定,見6節中的核心代碼

<property>
<name>dfs.blocksize</name>
<value>64m</value>
</property>

<property>
<name>dfs.replication</name>
<value>2</value>
</property>

 5.1.一、上傳過程

下圖爲datanode中的數據,.meta是該block的校驗和信息。咱們能夠經過linux cat命令將兩個塊合併,會發現與原來的文件是同樣的。

 

 5.1.二、下載過程

客戶端首先回去namenode上去查找,有沒有請求的hdfs路徑下的文件,有的話都有該文件被切割成幾塊,每塊有幾個副本,這些副本都存放在集羣中的哪些機器上,而後去存放了第一塊數據的某一臺機器上去下載第一塊數據,將數據追加到本地,而後去下載下一塊數據,繼續追加到本地文件,知道下載完全部的塊。

 

 

5.二、hdfs客戶端的經常使用操做命令

一、上傳文件到hdfs中

hadoop fs -put /本地文件  /aaa

 

二、下載文件到客戶端本地磁盤

hadoop fs -get /hdfs中的路徑   /本地磁盤目錄

 

三、在hdfs中建立文件夾

hadoop fs -mkdir  -p /aaa/xxx

 

四、移動hdfs中的文件(改名)

hadoop fs -mv /hdfs的路徑1  /hdfs的另外一個路徑2

 

複製hdfs中的文件到hdfs的另外一個目錄

hadoop fs -cp /hdfs路徑_1  /hdfs路徑_2

 

五、刪除hdfs中的文件或文件夾

hadoop fs -rm -r /aaa

 

六、查看hdfs中的文本文件內容

hadoop fs -cat /demo.txt

hadoop fs -tail /demo.txt

hadoop fs -tail -f /demo.txt

hadoop fs -text /demo.txt

 

七、查看hdfs目錄下有哪些文件

hadoop fs –ls /

 

八、追加本地文件到hdfs中的文件

hadoop fs -appendToFile 本地路徑 /hdfs路徑

 

九、權限修改

 hadoop fs -chmod username1:usergroup1 /hdfs路徑

要說明的是,hdfs中的用戶和用戶組這是一個名字稱呼,與linux不同,linux中不能將選線分配給一個不存在的用戶。

 

 能夠查看hadoop fs 不帶任何參數,來查看hdfs所支持的命令

Usage: hadoop fs [generic options] [-appendToFile <localsrc> ... <dst>] [-cat [-ignoreCrc] <src> ...] [-checksum <src> ...] [-chgrp [-R] GROUP PATH...] [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-copyFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>] [-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] <path> ...] [-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>] [-createSnapshot <snapshotDir> [<snapshotName>]] [-deleteSnapshot <snapshotDir> <snapshotName>] [-df [-h] [<path> ...]] [-du [-s] [-h] [-x] <path> ...] [-expunge] [-find <path> ... <expression> ...] [-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-getfacl [-R] <path>] [-getfattr [-R] {-n name | -d} [-e en] <path>] [-getmerge [-nl] [-skip-empty-file] <src> <localdst>] [-help [cmd ...]] [-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [<path> ...]] [-mkdir [-p] <path> ...] [-moveFromLocal <localsrc> ... <dst>] [-moveToLocal <src> <localdst>] [-mv <src> ... <dst>] [-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>] [-renameSnapshot <snapshotDir> <oldName> <newName>] [-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...] [-rmdir [--ignore-fail-on-non-empty] <dir> ...] [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]] [-setfattr {-n name [-v value] | -x name} <path>] [-setrep [-R] [-w] <rep> <path> ...] [-stat [format] <path> ...] [-tail [-f] <file>] [-test -[defsz] <path>] [-text [-ignoreCrc] <src> ...] [-touchz <path> ...] [-truncate [-w] <length> <path> ...] [-usage [cmd ...]]

 

 

六、hdfs的java客戶端編程

HDFS客戶端編程應用場景:數據採集

業務系統中日誌生成機制

數據採集程序其實就是經過對java客戶端編程,將數據不斷的上傳到hdfs。

在windows開發環境中作一些準備工做:

一、在windows的某個路徑中解壓一份windows版本的hadoop安裝包

二、將解壓出的hadoop目錄配置到windows的環境變量中:HADOOP_HOME

緣由:若不配置環境變量,會在下載hdfs文件是出錯,是因爲使用hadoop的FileSystem保存文件到本地的時候出於效率的考慮,會使用hadoop安裝包中的c語言庫,顯然沒有配置hadoop環境變量時是找不到該c語言類庫中的文件的;然而上傳文件到hdfs沒有相似問題;

 

 6.一、核心代碼

一、將hdfs客戶端開發所需的jar導入工程(jar包可在hadoop安裝包中找到common和hdfs)

二、寫代碼

6.1.一、獲取hdfs客戶端

要點:要對hdfs中的文件進行操做,代碼中首先須要得到一個hdfs的客戶端對象

Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");

 

 完整代碼以下:

/** * Configuration參數對象的機制: * 構造時,會加載jar包中的默認配置 xx-default.xml(core-default.xmlhdfs-default.xml) * 再加載 用戶配置xx-site.xml ,覆蓋掉默認參數 * 構造完成以後,還能夠conf.set("p","v"),會再次覆蓋用戶配置文件中的參數值 */
        // new Configuration()會從項目的classpath中加載core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件
        Configuration conf = new Configuration(); // 指定本客戶端上傳文件到hdfs時須要保存的副本數爲:2
        conf.set("dfs.replication", "2"); // 指定本客戶端上傳文件到hdfs時切塊的規格大小:64M
        conf.set("dfs.blocksize", "64m"); // 構造一個訪問指定HDFS系統的客戶端對象: 參數1:——HDFS系統的URI,參數2:——客戶端要特別指定的參數,參數3:客戶端的身份(用戶名)
        FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root"); // 上傳一個文件到HDFS中
        fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/")); fs.close();
View Code

 6.1.二、對文件進行操做

上傳、下載文件;文件夾的建立和刪除、文件的移動和複製、查看文件夾和文件等。

三、利用fs對象的方法進行文件操做

方法均與命令行方法對應,好比:

上傳文件

fs.copyFromLocalFile(new Path("本地路徑"),new Path("hdfs的路徑"));

 

下載文件

fs.copyToLocalFile(new Path("hdfs的路徑"),new Path("本地路徑"))

 對文件的增刪改查以下,對文件數據的操做後續介紹。

FileSystem fs = null; @Before public void init() throws Exception{ Configuration conf = new Configuration(); conf.set("dfs.replication", "2"); conf.set("dfs.blocksize", "64m"); fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root"); } /** * 從HDFS中下載文件到客戶端本地磁盤 * @throws IOException * @throws IllegalArgumentException */ @Test public void testGet() throws IllegalArgumentException, IOException{ fs.copyToLocalFile(new Path("/hdp20-05.txt"), new Path("f:/")); fs.close(); } /** * 在hdfs內部移動文件\修更名稱 */ @Test public void testRename() throws Exception{ fs.rename(new Path("/install.log"), new Path("/aaa/in.log")); fs.close(); } /** * 在hdfs中建立文件夾 */ @Test public void testMkdir() throws Exception{ fs.mkdirs(new Path("/xx/yy/zz")); fs.close(); } /** * 在hdfs中刪除文件或文件夾 */ @Test public void testRm() throws Exception{ fs.delete(new Path("/aaa"), true); fs.close(); } /** * 查詢hdfs指定目錄下的文件信息 */ @Test public void testLs() throws Exception{ // 只查詢文件的信息,不返回文件夾的信息
        RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true); while(iter.hasNext()){ LocatedFileStatus status = iter.next(); System.out.println("文件全路徑:"+status.getPath()); System.out.println("塊大小:"+status.getBlockSize()); System.out.println("文件長度:"+status.getLen()); System.out.println("副本數量:"+status.getReplication()); System.out.println("塊信息:"+Arrays.toString(status.getBlockLocations())); System.out.println("--------------------------------"); } fs.close(); } /** * 查詢hdfs指定目錄下的文件和文件夾信息 */ @Test public void testLs2() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path("/")); for(FileStatus status:listStatus){ System.out.println("文件全路徑:"+status.getPath()); System.out.println(status.isDirectory()?"這是文件夾":"這是文件"); System.out.println("塊大小:"+status.getBlockSize()); System.out.println("文件長度:"+status.getLen()); System.out.println("副本數量:"+status.getReplication()); System.out.println("--------------------------------"); } fs.close(); }
View Code

 6.1.三、對文件數據進行操做

 同過客戶端使用open打開流對象來讀取hdfs中文件的具體數據,包括指定偏移量來讀取特定範圍的數據;經過客戶端向hdfs文件追加數據。

/** * 讀取hdfs中的文件的內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/test.txt")); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); } br.close(); in.close(); fs.close(); } /** * 讀取hdfs中文件的指定偏移量範圍的內容 * * * 做業題:用本例中的知識,實現讀取一個文本文件中的指定BLOCK塊中的全部數據 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testRandomReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/xx.dat")); // 將讀取的起始位置進行指定
        in.seek(12); // 讀16個字節
        byte[] buf = new byte[16]; in.read(buf); System.out.println(new String(buf)); in.close(); fs.close(); }
View Code

 

 寫數據,create提供了豐富的重載函數,輕鬆實現覆蓋,追加,以及指定緩存大小,副本數量等等信息。

/** * 往hdfs中的文件寫內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testWriteData() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false); // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg
 FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg"); byte[] buf = new byte[1024]; int read = 0; while ((read = in.read(buf)) != -1) { out.write(buf,0,read); } in.close(); out.close(); fs.close(); }
View Code

 

七、HDFS實例

hdfs版本wordcount程序。

任務描述:

一、從hdfs文件中讀取數據,每次讀取一行數據;

二、將數據交給具體的單詞統計業務去做業(使用面向接口編程,當業務邏輯改變時,無需修改主程序代碼);

三、並將該行數據產生的結果存入緩存中(能夠用hashmap模擬)

數據採集設計:

一、流程
啓動一個定時任務:
——定時探測日誌源目錄
——獲取須要採集的文件
——移動這些文件到一個待上傳臨時目錄
——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄

啓動一個定時任務:
——探測備份目錄中的備份數據,檢查是否已超出最長備份時長,若是超出,則刪除


二、規劃各類路徑
日誌源路徑: d:/logs/accesslog/
待上傳臨時目錄: d:/logs/toupload/
備份目錄: d:/logs/backup/日期/

HDFS存儲路徑: /logs/日期
HDFS中的文件的前綴:access_log_
HDFS中的文件的後綴:.log

將路徑配置寫入屬性文件

MAPPER_CLASS=cn.edu360.hdfs.wordcount.CaseIgnorWcountMapper INPUT_PATH=/wordcount/input OUTPUT_PATH=/wordcount/output2
View Code

 

主程序代碼示例:

import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; public class HdfsWordcount { public static void main(String[] args) throws Exception{ /** * 初始化工做 */ Properties props = new Properties(); props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties")); Path input = new Path(props.getProperty("INPUT_PATH")); Path output = new Path(props.getProperty("OUTPUT_PATH")); Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS")); Mapper mapper = (Mapper) mapper_class.newInstance(); Context context =  new Context(); /** * 處理數據 */ FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root"); RemoteIterator<LocatedFileStatus> iter = fs.listFiles(input, false); while(iter.hasNext()){ LocatedFileStatus file = iter.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = null; // 逐行讀取
            while ((line = br.readLine()) != null) { // 調用一個方法對每一行進行業務處理
 mapper.map(line, context); } br.close(); in.close(); } /** * 輸出結果 */ HashMap<Object, Object> contextMap = context.getContextMap(); if(fs.exists(output)){ throw new RuntimeException("指定的輸出目錄已存在,請更換......!"); } FSDataOutputStream out = fs.create(new Path(output,new Path("res.dat"))); Set<Entry<Object, Object>> entrySet = contextMap.entrySet(); for (Entry<Object, Object> entry : entrySet) { out.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes()); } out.close(); fs.close(); System.out.println("恭喜!數據統計完成....."); } }
View Code

 

自定義的業務接口

public interface Mapper { public void map(String line,Context context); }
View Code

 

業務實現類1

public class WordCountMapper implements Mapper{ @Override public void map(String line, Context context) { String[] words = line.split(" "); for (String word : words) { Object value = context.get(word); if(null==value){ context.write(word, 1); }else{ int v = (int)value; context.write(word, v+1); } } } }
View Code

 

業務實現類2

public class CaseIgnorWcountMapper implements Mapper { @Override public void map(String line, Context context) { String[] words = line.toUpperCase().split(" "); for (String word : words) { Object value = context.get(word); if (null == value) { context.write(word, 1); } else { int v = (int) value; context.write(word, v + 1); } } } }
View Code

 

緩存模擬

import java.util.HashMap; public class Context { private HashMap<Object,Object> contextMap = new HashMap<>(); public void write(Object key,Object value){ contextMap.put(key, value); } public Object get(Object key){ return contextMap.get(key); } public HashMap<Object,Object> getContextMap(){ return contextMap; } }
View Code

 

八、實戰描述

 需求描述:

在業務系統的服務器上,業務程序會不斷生成業務日誌(好比網站的頁面訪問日誌)

業務日誌是用log4j生成的,會不斷地切出日誌文件

須要按期(好比每小時)從業務服務器上的日誌目錄中,探測須要採集的日誌文件(access.log,不是直接採集數據),發往HDFS

 

注意點:業務服務器可能有多臺(hdfs上的文件名不能直接用日誌服務器上的文件名)

當天採集到的日誌要放在hdfs的當天目錄中

採集完成的日誌文件,須要移動到到日誌服務器的一個備份目錄中

按期檢查(一小時檢查一次)備份目錄,將備份時長超出24小時的日誌文件清除

Timer timer = new Timer() timer.schedual()

 簡易版日誌採集主程序

import java.util.Timer; public class DataCollectMain { public static void main(String[] args) { Timer timer = new Timer(); timer.schedule(new CollectTask(), 0, 60*60*1000L); timer.schedule(new BackupCleanTask(), 0, 60*60*1000L); } }
View Code

 

日誌收集定時任務類

import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.TimerTask; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; public class CollectTask extends TimerTask { @Override public void run() { /** * ——定時探測日誌源目錄 ——獲取須要採集的文件 ——移動這些文件到一個待上傳臨時目錄 * ——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄 * */
        try { // 獲取配置參數
            Properties props = PropertyHolderLazy.getProps(); // 構造一個log4j日誌對象
            Logger logger = Logger.getLogger("logRollingFile"); // 獲取本次採集時的日期
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String day = sdf.format(new Date()); File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR)); // 列出日誌源目錄中須要採集的文件
            File[] listFiles = srcDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) { return true; } return false; } }); // 記錄日誌
            logger.info("探測到以下文件須要採集:" + Arrays.toString(listFiles)); // 將要採集的文件移動到待上傳臨時目錄
            File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR)); for (File file : listFiles) { FileUtils.moveFileToDirectory(file, toUploadDir, true); } // 記錄日誌
            logger.info("上述文件移動到了待上傳目錄" + toUploadDir.getAbsolutePath()); // 構造一個HDFS的客戶端對象
 FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), "root"); File[] toUploadFiles = toUploadDir.listFiles(); // 檢查HDFS中的日期目錄是否存在,若是不存在,則建立
            Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day); if (!fs.exists(hdfsDestPath)) { fs.mkdirs(hdfsDestPath); } // 檢查本地的備份目錄是否存在,若是不存在,則建立
            File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + "/"); if (!backupDir.exists()) { backupDir.mkdirs(); } for (File file : toUploadFiles) { // 傳輸文件到HDFS並更名access_log_
                Path destPath = new Path(hdfsDestPath + "/" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX)); fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath); // 記錄日誌
                logger.info("文件傳輸到HDFS完成:" + file.getAbsolutePath() + "-->" + destPath); // 將傳輸完成的文件移動到備份目錄
                FileUtils.moveFileToDirectory(file, backupDir, true); // 記錄日誌
                logger.info("文件備份完成:" + file.getAbsolutePath() + "-->" + backupDir); } } catch (Exception e) { e.printStackTrace(); } } }
View Code

 

按期清理過期備份日誌

import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask; import org.apache.commons.io.FileUtils; public class BackupCleanTask extends TimerTask { @Override public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); long now = new Date().getTime(); try { // 探測本地備份目錄
            File backupBaseDir = new File("d:/logs/backup/"); File[] dayBackDir = backupBaseDir.listFiles(); // 判斷備份日期子目錄是否已超24小時
            for (File dir : dayBackDir) { long time = sdf.parse(dir.getName()).getTime(); if(now-time>24*60*60*1000L){ FileUtils.deleteDirectory(dir); } } } catch (Exception e) { e.printStackTrace(); } } }
View Code

 

配置信息提取到屬性配置文件中,並寫成常量,以單例設計模式去加載配置信息。

LOG_SOURCE_DIR=d:/logs/accesslog/ LOG_TOUPLOAD_DIR=d:/logs/toupload/ LOG_BACKUP_BASE_DIR=d:/logs/backup/ LOG_BACKUP_TIMEOUT=24 LOG_LEGAL_PREFIX=access.log. HDFS_URI=hdfs://hdp-01:9000/
HDFS_DEST_BASE_DIR=/logs/ HDFS_FILE_PREFIX=access_log_ HDFS_FILE_SUFFIX=.log
View Code
public class Constants { /** * 日誌源目錄參數key */
    public static final String LOG_SOURCE_DIR = "LOG_SOURCE_DIR"; /** * 日誌待上傳目錄參數key */
    public static final String LOG_TOUPLOAD_DIR = "LOG_TOUPLOAD_DIR"; public static final String LOG_BACKUP_BASE_DIR = "LOG_BACKUP_BASE_DIR"; public static final String LOG_BACKUP_TIMEOUT = "LOG_BACKUP_TIMEOUT"; public static final String LOG_LEGAL_PREFIX = "LOG_LEGAL_PREFIX"; public static final String HDFS_URI = "HDFS_URI"; public static final String HDFS_DEST_BASE_DIR = "HDFS_DEST_BASE_DIR"; public static final String HDFS_FILE_PREFIX = "HDFS_FILE_PREFIX"; public static final String HDFS_FILE_SUFFIX = "HDFS_FILE_SUFFIX"; }
View Code
import java.util.Properties; /** * 單例模式:懶漢式——考慮了線程安全 * @author ThinkPad * */

public class PropertyHolderLazy { private static Properties prop = null; public static Properties getProps() throws Exception { if (prop == null) { synchronized (PropertyHolderLazy.class) { if (prop == null) { prop = new Properties(); prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties")); } } } return prop; } }
View Code

 

import java.util.Properties; /** * 單例設計模式,方式一: 餓漢式單例 * @author ThinkPad * */
public class PropertyHolderHungery { private static Properties prop = new Properties(); static { try { prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties")); } catch (Exception e) { } } public static Properties getProps() throws Exception { return prop; } }
View Code

九、總結

hdfs有服務端和客戶端;

服務端:

  成員:namenode 管理元數據,datanode存儲數據

  配置:須要指定使用的文件系統(默認的配置在core-default.xml,爲本地文件系統,須要修改服務器core-site.xml修改成hdfs文件系統,並指定namenode),namenode和datanode的工做目錄(服務器的默認配置在hdfs-default.xml中,默認是在/temp下,須要就該hdfs-site.xml來覆蓋默認值。);

  細節:第一次啓動集羣時須要格式化namenode

客戶端:

  形式:網頁端,命令行,java客戶端api;客戶端能夠運行在任何地方。

  功能:指定上傳文件的信息報括切塊大小(hdfs-default.xml中默認值128m,能夠在hdfs-site.xml中修改,也能夠咋java api 中建立客戶端對象的時候指定,總之由客戶端來指定),副本數量(hdfs-default.xml中默認值3,一樣能夠修改覆蓋);完成hdfs中文件的系列操做,如上傳,下載

雖然服務端和客戶端的共用配置 core-default.xml core-site.xml;hdfs-default.xml hdfs-site.xml,可是不一樣的程序所須要的參數不一樣,只不過爲了方便,全部參數都寫在一個文件中了。便是在服務器的hdfs-site.xml中配置了切塊大小和副本數量,服務器的namenode和datanode根本不關心也不使用這些參數,只有啓動服務器上的命令行客戶端時,該參數纔可能起做用。

相關文章
相關標籤/搜索