主要內容:hdfs的總體運行機制,DATANODE存儲文件塊的觀察,hdfs集羣的搭建與配置,hdfs命令行客戶端常見命令;業務系統中日誌生成機制,HDFS的java客戶端api基本使用。java
《數據處理》node
在互聯網技術發展到現今階段,大量平常、工做等事務產生的數據都已經信息化,人類產生的數據量相比之前有了爆炸式的增加,之前的傳統的數據處理技術已經沒法勝任,需求催生技術,一套用來處理海量數據的軟件工具應運而生,這就是大數據!linux
處理海量數據的核心技術:web
海量數據存儲:分佈式算法
海量數據運算:分佈式sql
大數據的海量數據的存儲和運算,核心技術就是分佈式。數據庫
這些核心技術的實現是不須要用戶從零開始造輪子的express
存儲和運算,都已經有大量的成熟的框架來用apache
存儲框架:編程
HDFS——分佈式文件存儲系統(HADOOP中的存儲框架)
HBASE——分佈式數據庫系統
KAFKA——分佈式消息緩存系統(實時流式數據處理場景中應用普遍)
文件系統中的數據以非結構化居多,沒有直觀的結構,數據庫中的信息多以表的形式存在,具備結構化,存在規律;
查詢的時候文本文件只能一行一行掃描,而數據庫效率高不少,能夠利用sql查詢語法,數據庫在存和取方便的多。
數據庫和文件系統相比,數據庫至關於在特定的文件系統上的軟件封裝。其實HBASE就是對HDFS的進一層封裝,它的底層文件系統就是HDFS。
分佈式消息緩存系統,既然是分佈式,那就意味着橫跨不少機器,意味着容量能夠很大。和前二者相比它的數據存儲形式是消息(不是表,也不是文件),消息能夠看作有固定格式的一條數據,好比消息頭,消息體等,消息體能夠是json,數據庫的一條記錄,一個序列化對象等。消息最終存放在kafaka內部的特定的文件系統裏。
運算框架:(要解決的核心問題就是幫用戶將處理邏輯在不少機器上並行)
MAPREDUCE—— 離線批處理/HADOOP中的運算框架
SPARK —— 離線批處理/實時流式計算
STORM —— 實時流式計算
離線批處理:數據是靜態的,一次處理一大批數據。
實時流式:數據在源源不斷的生成,邊生成,邊計算
這些運算框架的思想都差很少,特別是mapreduce和spark,簡單來看spark是對mapreduce的進一步封裝;
運算框架和存儲框架之間沒有強耦合關係,spark能夠讀HDFS,HBASE,KAFKA裏的數據,固然須要存儲框架提供訪問接口。
輔助類的工具(解放大數據工程師的一些繁瑣工做):
HIVE —— 數據倉庫工具:能夠接收sql,翻譯成mapreduce或者spark程序運行
FLUME——數據採集
SQOOP——數據遷移
ELASTIC SEARCH —— 分佈式的搜索引擎
flume用於自動採集數據源機器上的數據到大數據集羣中。
HIVE看起來像一個數據庫,但其實不是,Hive中存了一些須要分析的數據,而後在直接寫sql進行分析,hive接收sql,翻譯成mapreduce或者spark程序運行;
hive本質是mapreduce或spark,咱們只須要寫sql邏輯而不是mapreduce邏輯,Hive自動完成對sql的翻譯,並且仍是在海量數據集上。
.......
換個角度說,大數據是:
一、有海量的數據
二、有對海量數據進行挖掘的需求
三、有對海量數據進行挖掘的軟件工具(hadoop、spark、storm、flink、tez、impala......)
數據處理的最典型應用:公司的產品運營狀況分析
電商推薦系統:基於海量的瀏覽行爲、購物行爲數據,進行大量的算法模型的運算,得出各種推薦結論,以供電商網站頁面來爲用戶進行商品推薦
精準廣告推送系統:基於海量的互聯網用戶的各種數據,統計分析,進行用戶畫像(獲得用戶的各類屬性標籤),而後能夠爲廣告主進行有針對性的精準的廣告投放
hadoop中有3個核心組件:
分佈式文件系統:HDFS —— 實現將文件分佈式存儲在不少的服務器上
分佈式運算編程框架:MAPREDUCE —— 實如今不少機器上分佈式並行運算
分佈式資源調度平臺:YARN —— 幫用戶調度大量的mapreduce程序,併合理分配運算資源
hdfs:分佈式文件系統
hdfs有着文件系統共同的特徵:
一、有目錄結構,頂層目錄是: /
二、系統中存放的就是文件
三、系統能夠提供對文件的:建立、刪除、修改、查看、移動等功能
hdfs跟普通的單機文件系統有區別:
一、單機文件系統中存放的文件,是在一臺機器的操做系統中
二、hdfs的文件系統會橫跨N多的機器
三、單機文件系統中存放的文件,是在一臺機器的磁盤上
四、hdfs文件系統中存放的文件,是落在n多機器的本地單機文件系統中(hdfs是一個基於linux本地文件系統之上的文件系統)
hdfs的工做機制:
一、客戶把一個文件存入hdfs,其實hdfs會把這個文件切塊後,分散存儲在N臺linux機器系統中(負責存儲文件塊的角色:data node)<準確來講:切塊的行爲是由客戶端決定的>
二、一旦文件被切塊存儲,那麼,hdfs中就必須有一個機制,來記錄用戶的每個文件的切塊信息,及每一塊的具體存儲機器(負責記錄塊信息的角色是:name node)
三、爲了保證數據的安全性,hdfs能夠將每個文件塊在集羣中存放多個副本(到底存幾個副本,是由當時存入該文件的客戶端指定的)
綜述:一個hdfs系統,由一臺運行了namenode的服務器,和N臺運行了datanode的服務器組成!
學習階段,用虛擬機便可!
先準備4臺虛擬機:1個namenode節點 + 3 個datanode 節點
主機名:hdp-01 對應的ip地址:192.168.33.11
主機名:hdp-02 對應的ip地址:192.168.33.12
主機名:hdp-03 對應的ip地址:192.168.33.13
主機名:hdp-04 對應的ip地址:192.168.33.14
4.2.三、從windows中用CRT軟件進行遠程鏈接
在windows中將各臺linux機器的主機名配置到的windows的本地域名映射文件中:
c:/windows/system32/drivers/etc/hosts
192.168.33.11 hdp-01 192.168.33.12 hdp-02 192.168.33.13 hdp-03 192.168.33.14 hdp-04 |
用crt鏈接上後,修改一下crt的顯示配置(字號,編碼集改成UTF-8):
關閉防火牆:service iptables stop
關閉防火牆自啓: chkconfig iptables off
1) 利用alt+p 打開sftp窗口,而後將jdk壓縮包拖入sftp窗口
2) 而後在linux中將jdk壓縮包解壓到/root/apps 下
3) 配置環境變量:JAVA_HOME PATH
vi /etc/profile 在文件的最後,加入:
export JAVA_HOME=/root/apps/jdk1.8.0_60 export PATH=$PATH:$JAVA_HOME/bin
4) 修改完成後,記得 source /etc/profile使配置生效
5) 檢驗:在任意目錄下輸入命令: java -version 看是否成功執行
6) 將安裝好的jdk目錄用scp命令拷貝到其餘機器
7) 將/etc/profile配置文件也用scp命令拷貝到其餘機器並分別執行source命令
在hdp-01上,vi /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.33.11 hdp-01 192.168.33.12 hdp-02 192.168.33.13 hdp-03 192.168.33.14 hdp-04 |
而後,將hosts文件拷貝到集羣中的全部其餘機器上
scp /etc/hosts hdp-02:/etc/
scp /etc/hosts hdp-03:/etc/
scp /etc/hosts hdp-04:/etc/
補 充 提示: |
若是在執行scp命令的時候,提示沒有scp命令,則能夠配置一個本地yum源來安裝 一、先在虛擬機中配置cdrom爲一個centos的安裝鏡像iso文件 二、在linux系統中將光驅掛在到文件系統中(某個目錄) 三、mkdir /mnt/cdrom 四、mount -t iso9660 -o loop /dev/cdrom /mnt/cdrom 五、檢驗掛載是否成功: ls /mnt/cdrom 六、三、配置yum的倉庫地址配置文件 七、yum的倉庫地址配置文件目錄: /etc/yum.repos.d 八、先將自帶的倉庫地址配置文件批量改名: 九、而後,拷貝一個出來進行修改 十、修改完配置文件後,再安裝scp命令: 十一、yum install openssh-clients -y |
bin文件爲hadoop功能命令,sbin中爲集羣管理命令。
要 點 提 示 |
核心配置參數: 1) 指定hadoop的默認文件系統爲:hdfs 2) 指定hdfs的namenode節點爲哪臺機器 3) 指定namenode軟件存儲元數據的本地目錄 4) 指定datanode軟件存放文件塊的本地目錄 |
hadoop的配置文件在:/root/apps/hadoop安裝目錄/etc/hadoop/
hadoop中的其餘組件如mapreduce,yarn等,這些組將會去讀數據,指定hadoop的默認文件系統爲:hdfs,就是告訴這些組件去hdfs中讀數據;該項配置意味dadoop中的組件能夠訪問各類文件系統。
若不指定數據的存放目錄,hadoop默認將數據存放在/temp下。
能夠參考官網的默認配置信息。
export JAVA_HOME=/root/apps/jdk1.8.0_60
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://hdp-01:9000/</value> </property> </configuration>
<value>hdfs://hdp-01:9000</value>包含兩層意思:
一、指定默認的文件系統。
二、指明瞭namenode是誰。
value中的值是URI風格
配置namenode和datanode的工做目錄,添加secondary name node。
<configuration> <property> <name>dfs.namenode.name.dir</name> <value>/root/hdpdata/name/</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/root/hdpdata/data</value> </property> <property> <name>dfs.namenode.secondary.http-address</name> <value>hdp-02:50090</value> </property> </configuration>
scp -r /root/apps/hadoop-2.8.1 hdp-02:/root/apps/ scp -r /root/apps/hadoop-2.8.1 hdp-03:/root/apps/ scp -r /root/apps/hadoop-2.8.1 hdp-04:/root/apps/
所謂的啓動HDFS,就是在對的機器上啓動對的軟件
要 點 提示: |
要運行hadoop的命令,須要在linux環境中配置HADOOP_HOME和PATH環境變量 vi /etc/profile
|
首先,初始化namenode的元數據目錄
要在hdp-01上執行hadoop的一個命令來初始化namenode的元數據存儲目錄
hadoop namenode -format
建立一個全新的元數據存儲目錄
生成記錄元數據的文件fsimage
生成集羣的相關標識:如:集羣id——clusterID
該步驟叫作namenode的初始化也叫格式化,本質是創建namenode運行所須要的目錄以及一些必要的文件,因此該操做通常只在集羣第一次啓動以前執行。
而後,啓動namenode進程(在hdp-01上)
hadoop-daemon.sh start namenode
啓動完後,首先用jps查看一下namenode的進程是否存在
namenode就是一個java軟件,咱們知道啓動一個java軟件須要主類的main方法 java xxx.java - 若干參數,處於方便的考慮,hadoop中提供了一個通用的軟件啓動腳本hadoop-daemon.sh,腳本能夠接受參數,專門用來啓動hadoop中的軟件。
能夠看到namenode在監聽兩個端口,9000用來和客戶端通訊(9000爲RPC端口號,內部進程之間互相通訊的端口,datanode和namenode的通訊),接受hdfs客戶端的請求,50070是web服務端口,也就是說namenode內置一個web服務器,http客戶端能夠經過次端口發送請求。
而後,在windows中用瀏覽器訪問namenode提供的web端口:50070
http://hdp-01:50070
而後,啓動衆datanode們(在任意地方)
hadoop-daemon.sh start datanode
下圖是datanode的一下信息展現,能夠看到datanode內部通訊的端口號是50010,並且datanode也提供了問訪問端口50075.
hdfs其實就是一堆java軟件,咱們能夠本身手動hadoop-daemon.sh逐個啓動,也可使用hadoop提供的批量啓動腳本。
1) 先配置hdp-01到集羣中全部機器(包含本身)的免密登錄
2) 配完免密後,能夠執行一次 ssh 0.0.0.0
3) 修改hadoop安裝目錄中/etc/hadoop/slaves(把須要啓動datanode進程的節點列入)
hdp-01 hdp-02 hdp-03 hdp-04 |
core-site.xml中配置過namenode,可是須要批量啓動那些datanode呢,該文件/etc/hadoop/slaves的配置就是解決這個問題的,該文件就是給啓動腳本看的。
4) 在hdp-01上用腳本:start-dfs.sh 來自動啓動整個集羣
5) 若是要中止,則用腳本:stop-dfs.sh
start-dfs.sh、stop-dfs.sh會啓動、關閉namenode,datanode和secondnamenode
固然你也能夠本身寫腳原本作上述的事情 ,以下所示。
hdfs裝好以後,接下來的工做就是hdfs裏傳東西,去東西,由客戶端來完成。
hdfs的客戶端有多種形式:
一、網頁形式
二、命令行形式
三、客戶端在哪裏運行,沒有約束,只要運行客戶端的機器可以跟hdfs集羣聯網們
對於客戶端來說,hdfs是一個總體,網頁版的客戶端主要是用來查看hdfs信息的,能夠建立目錄,可是須要權限
命令行客戶端
bin命令中的 hadoop 和 hdfs 均可以啓動 hdfs 客戶端,hadoop和hdfs都是腳本,都會去啓動一個hdfs的java客戶端。java客戶端在安裝包的jar包中
./hadoop fs -ls /
表示hadoop要訪問hdfs,該腳本就會去啓動hdfs客戶端,客戶端能夠接收參數,好比查看hdfs根目錄。
文件的切塊大小和存儲的副本數量,都是由客戶端決定!
所謂的由客戶端決定,是經過配置參數來定的
hdfs的客戶端會讀如下兩個參數,來決定切塊大小(默認128M)、副本數量(默認3):
切塊大小的參數: dfs.blocksize
副本數量的參數: dfs.replication
若是使用命令行客戶端時,上面兩個參數應該配置在客戶端機器的hadoop目錄中的hdfs-site.xml中配置,(命令行客戶端本質就是啓動了一個java客戶端,這個客戶端在啓動的時候會將它依賴的全部jar包加入classpath中,客戶端會從jar包中,加載xx-default.xml來得到默認的配置文件,也能夠在hadoop/etc/xxx-site.xml中配置具體的參數來覆蓋默認值。此時的/etc下的配置文件就是客戶自定義的配置文件,也會被java客戶端加載【客戶端能夠運行在任何地方】);
固然也能夠在具體代碼中指定,見6節中的核心代碼
<property> <name>dfs.blocksize</name> <value>64m</value> </property> <property> <name>dfs.replication</name> <value>2</value> </property>
下圖爲datanode中的數據,.meta是該block的校驗和信息。咱們能夠經過linux cat命令將兩個塊合併,會發現與原來的文件是同樣的。
客戶端首先回去namenode上去查找,有沒有請求的hdfs路徑下的文件,有的話都有該文件被切割成幾塊,每塊有幾個副本,這些副本都存放在集羣中的哪些機器上,而後去存放了第一塊數據的某一臺機器上去下載第一塊數據,將數據追加到本地,而後去下載下一塊數據,繼續追加到本地文件,知道下載完全部的塊。
一、上傳文件到hdfs中
hadoop fs -put /本地文件 /aaa
二、下載文件到客戶端本地磁盤
hadoop fs -get /hdfs中的路徑 /本地磁盤目錄
三、在hdfs中建立文件夾
hadoop fs -mkdir -p /aaa/xxx
四、移動hdfs中的文件(改名)
hadoop fs -mv /hdfs的路徑1 /hdfs的另外一個路徑2
複製hdfs中的文件到hdfs的另外一個目錄
hadoop fs -cp /hdfs路徑_1 /hdfs路徑_2
五、刪除hdfs中的文件或文件夾
hadoop fs -rm -r /aaa
六、查看hdfs中的文本文件內容
hadoop fs -cat /demo.txt
hadoop fs -tail /demo.txt
hadoop fs -tail -f /demo.txt
hadoop fs -text /demo.txt
七、查看hdfs目錄下有哪些文件
hadoop fs –ls /
八、追加本地文件到hdfs中的文件
hadoop fs -appendToFile 本地路徑 /hdfs路徑
九、權限修改
hadoop fs -chmod username1:usergroup1 /hdfs路徑
要說明的是,hdfs中的用戶和用戶組這是一個名字稱呼,與linux不同,linux中不能將選線分配給一個不存在的用戶。
能夠查看hadoop fs 不帶任何參數,來查看hdfs所支持的命令
Usage: hadoop fs [generic options] [-appendToFile <localsrc> ... <dst>] [-cat [-ignoreCrc] <src> ...] [-checksum <src> ...] [-chgrp [-R] GROUP PATH...] [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-copyFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>] [-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] <path> ...] [-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>] [-createSnapshot <snapshotDir> [<snapshotName>]] [-deleteSnapshot <snapshotDir> <snapshotName>] [-df [-h] [<path> ...]] [-du [-s] [-h] [-x] <path> ...] [-expunge] [-find <path> ... <expression> ...] [-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-getfacl [-R] <path>] [-getfattr [-R] {-n name | -d} [-e en] <path>] [-getmerge [-nl] [-skip-empty-file] <src> <localdst>] [-help [cmd ...]] [-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [<path> ...]] [-mkdir [-p] <path> ...] [-moveFromLocal <localsrc> ... <dst>] [-moveToLocal <src> <localdst>] [-mv <src> ... <dst>] [-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>] [-renameSnapshot <snapshotDir> <oldName> <newName>] [-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...] [-rmdir [--ignore-fail-on-non-empty] <dir> ...] [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]] [-setfattr {-n name [-v value] | -x name} <path>] [-setrep [-R] [-w] <rep> <path> ...] [-stat [format] <path> ...] [-tail [-f] <file>] [-test -[defsz] <path>] [-text [-ignoreCrc] <src> ...] [-touchz <path> ...] [-truncate [-w] <length> <path> ...] [-usage [cmd ...]]
HDFS客戶端編程應用場景:數據採集
業務系統中日誌生成機制
數據採集程序其實就是經過對java客戶端編程,將數據不斷的上傳到hdfs。
在windows開發環境中作一些準備工做:
一、在windows的某個路徑中解壓一份windows版本的hadoop安裝包
二、將解壓出的hadoop目錄配置到windows的環境變量中:HADOOP_HOME
緣由:若不配置環境變量,會在下載hdfs文件是出錯,是因爲使用hadoop的FileSystem保存文件到本地的時候出於效率的考慮,會使用hadoop安裝包中的c語言庫,顯然沒有配置hadoop環境變量時是找不到該c語言類庫中的文件的;然而上傳文件到hdfs沒有相似問題;
一、將hdfs客戶端開發所需的jar導入工程(jar包可在hadoop安裝包中找到common和hdfs)
二、寫代碼
要點:要對hdfs中的文件進行操做,代碼中首先須要得到一個hdfs的客戶端對象
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
完整代碼以下:
/** * Configuration參數對象的機制: * 構造時,會加載jar包中的默認配置 xx-default.xml(core-default.xmlhdfs-default.xml) * 再加載 用戶配置xx-site.xml ,覆蓋掉默認參數 * 構造完成以後,還能夠conf.set("p","v"),會再次覆蓋用戶配置文件中的參數值 */ // new Configuration()會從項目的classpath中加載core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件 Configuration conf = new Configuration(); // 指定本客戶端上傳文件到hdfs時須要保存的副本數爲:2 conf.set("dfs.replication", "2"); // 指定本客戶端上傳文件到hdfs時切塊的規格大小:64M conf.set("dfs.blocksize", "64m"); // 構造一個訪問指定HDFS系統的客戶端對象: 參數1:——HDFS系統的URI,參數2:——客戶端要特別指定的參數,參數3:客戶端的身份(用戶名) FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root"); // 上傳一個文件到HDFS中 fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/")); fs.close();
上傳、下載文件;文件夾的建立和刪除、文件的移動和複製、查看文件夾和文件等。
三、利用fs對象的方法進行文件操做
方法均與命令行方法對應,好比:
上傳文件
fs.copyFromLocalFile(new Path("本地路徑"),new Path("hdfs的路徑"));
下載文件
fs.copyToLocalFile(new Path("hdfs的路徑"),new Path("本地路徑"))
對文件的增刪改查以下,對文件數據的操做後續介紹。
FileSystem fs = null; @Before public void init() throws Exception{ Configuration conf = new Configuration(); conf.set("dfs.replication", "2"); conf.set("dfs.blocksize", "64m"); fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root"); } /** * 從HDFS中下載文件到客戶端本地磁盤 * @throws IOException * @throws IllegalArgumentException */ @Test public void testGet() throws IllegalArgumentException, IOException{ fs.copyToLocalFile(new Path("/hdp20-05.txt"), new Path("f:/")); fs.close(); } /** * 在hdfs內部移動文件\修更名稱 */ @Test public void testRename() throws Exception{ fs.rename(new Path("/install.log"), new Path("/aaa/in.log")); fs.close(); } /** * 在hdfs中建立文件夾 */ @Test public void testMkdir() throws Exception{ fs.mkdirs(new Path("/xx/yy/zz")); fs.close(); } /** * 在hdfs中刪除文件或文件夾 */ @Test public void testRm() throws Exception{ fs.delete(new Path("/aaa"), true); fs.close(); } /** * 查詢hdfs指定目錄下的文件信息 */ @Test public void testLs() throws Exception{ // 只查詢文件的信息,不返回文件夾的信息 RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true); while(iter.hasNext()){ LocatedFileStatus status = iter.next(); System.out.println("文件全路徑:"+status.getPath()); System.out.println("塊大小:"+status.getBlockSize()); System.out.println("文件長度:"+status.getLen()); System.out.println("副本數量:"+status.getReplication()); System.out.println("塊信息:"+Arrays.toString(status.getBlockLocations())); System.out.println("--------------------------------"); } fs.close(); } /** * 查詢hdfs指定目錄下的文件和文件夾信息 */ @Test public void testLs2() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path("/")); for(FileStatus status:listStatus){ System.out.println("文件全路徑:"+status.getPath()); System.out.println(status.isDirectory()?"這是文件夾":"這是文件"); System.out.println("塊大小:"+status.getBlockSize()); System.out.println("文件長度:"+status.getLen()); System.out.println("副本數量:"+status.getReplication()); System.out.println("--------------------------------"); } fs.close(); }
同過客戶端使用open打開流對象來讀取hdfs中文件的具體數據,包括指定偏移量來讀取特定範圍的數據;經過客戶端向hdfs文件追加數據。
/** * 讀取hdfs中的文件的內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/test.txt")); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); } br.close(); in.close(); fs.close(); } /** * 讀取hdfs中文件的指定偏移量範圍的內容 * * * 做業題:用本例中的知識,實現讀取一個文本文件中的指定BLOCK塊中的全部數據 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testRandomReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/xx.dat")); // 將讀取的起始位置進行指定 in.seek(12); // 讀16個字節 byte[] buf = new byte[16]; in.read(buf); System.out.println(new String(buf)); in.close(); fs.close(); }
寫數據,create提供了豐富的重載函數,輕鬆實現覆蓋,追加,以及指定緩存大小,副本數量等等信息。
/** * 往hdfs中的文件寫內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testWriteData() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false); // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg"); byte[] buf = new byte[1024]; int read = 0; while ((read = in.read(buf)) != -1) { out.write(buf,0,read); } in.close(); out.close(); fs.close(); }
hdfs版本wordcount程序。
任務描述:
一、從hdfs文件中讀取數據,每次讀取一行數據;
二、將數據交給具體的單詞統計業務去做業(使用面向接口編程,當業務邏輯改變時,無需修改主程序代碼);
三、並將該行數據產生的結果存入緩存中(能夠用hashmap模擬)
數據採集設計:
一、流程
啓動一個定時任務:
——定時探測日誌源目錄
——獲取須要採集的文件
——移動這些文件到一個待上傳臨時目錄
——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄
啓動一個定時任務:
——探測備份目錄中的備份數據,檢查是否已超出最長備份時長,若是超出,則刪除
二、規劃各類路徑
日誌源路徑: d:/logs/accesslog/
待上傳臨時目錄: d:/logs/toupload/
備份目錄: d:/logs/backup/日期/
HDFS存儲路徑: /logs/日期
HDFS中的文件的前綴:access_log_
HDFS中的文件的後綴:.log
將路徑配置寫入屬性文件
MAPPER_CLASS=cn.edu360.hdfs.wordcount.CaseIgnorWcountMapper INPUT_PATH=/wordcount/input OUTPUT_PATH=/wordcount/output2
主程序代碼示例:
import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; public class HdfsWordcount { public static void main(String[] args) throws Exception{ /** * 初始化工做 */ Properties props = new Properties(); props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties")); Path input = new Path(props.getProperty("INPUT_PATH")); Path output = new Path(props.getProperty("OUTPUT_PATH")); Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS")); Mapper mapper = (Mapper) mapper_class.newInstance(); Context context = new Context(); /** * 處理數據 */ FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root"); RemoteIterator<LocatedFileStatus> iter = fs.listFiles(input, false); while(iter.hasNext()){ LocatedFileStatus file = iter.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = null; // 逐行讀取 while ((line = br.readLine()) != null) { // 調用一個方法對每一行進行業務處理 mapper.map(line, context); } br.close(); in.close(); } /** * 輸出結果 */ HashMap<Object, Object> contextMap = context.getContextMap(); if(fs.exists(output)){ throw new RuntimeException("指定的輸出目錄已存在,請更換......!"); } FSDataOutputStream out = fs.create(new Path(output,new Path("res.dat"))); Set<Entry<Object, Object>> entrySet = contextMap.entrySet(); for (Entry<Object, Object> entry : entrySet) { out.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes()); } out.close(); fs.close(); System.out.println("恭喜!數據統計完成....."); } }
自定義的業務接口
public interface Mapper { public void map(String line,Context context); }
業務實現類1
public class WordCountMapper implements Mapper{ @Override public void map(String line, Context context) { String[] words = line.split(" "); for (String word : words) { Object value = context.get(word); if(null==value){ context.write(word, 1); }else{ int v = (int)value; context.write(word, v+1); } } } }
業務實現類2
public class CaseIgnorWcountMapper implements Mapper { @Override public void map(String line, Context context) { String[] words = line.toUpperCase().split(" "); for (String word : words) { Object value = context.get(word); if (null == value) { context.write(word, 1); } else { int v = (int) value; context.write(word, v + 1); } } } }
緩存模擬
import java.util.HashMap; public class Context { private HashMap<Object,Object> contextMap = new HashMap<>(); public void write(Object key,Object value){ contextMap.put(key, value); } public Object get(Object key){ return contextMap.get(key); } public HashMap<Object,Object> getContextMap(){ return contextMap; } }
需求描述:
在業務系統的服務器上,業務程序會不斷生成業務日誌(好比網站的頁面訪問日誌)
業務日誌是用log4j生成的,會不斷地切出日誌文件
須要按期(好比每小時)從業務服務器上的日誌目錄中,探測須要採集的日誌文件(access.log,不是直接採集數據),發往HDFS
注意點:業務服務器可能有多臺(hdfs上的文件名不能直接用日誌服務器上的文件名)
當天採集到的日誌要放在hdfs的當天目錄中
採集完成的日誌文件,須要移動到到日誌服務器的一個備份目錄中
按期檢查(一小時檢查一次)備份目錄,將備份時長超出24小時的日誌文件清除
Timer timer = new Timer() timer.schedual()
簡易版日誌採集主程序
import java.util.Timer; public class DataCollectMain { public static void main(String[] args) { Timer timer = new Timer(); timer.schedule(new CollectTask(), 0, 60*60*1000L); timer.schedule(new BackupCleanTask(), 0, 60*60*1000L); } }
日誌收集定時任務類
import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.TimerTask; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; public class CollectTask extends TimerTask { @Override public void run() { /** * ——定時探測日誌源目錄 ——獲取須要採集的文件 ——移動這些文件到一個待上傳臨時目錄 * ——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄 * */ try { // 獲取配置參數 Properties props = PropertyHolderLazy.getProps(); // 構造一個log4j日誌對象 Logger logger = Logger.getLogger("logRollingFile"); // 獲取本次採集時的日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String day = sdf.format(new Date()); File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR)); // 列出日誌源目錄中須要採集的文件 File[] listFiles = srcDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) { return true; } return false; } }); // 記錄日誌 logger.info("探測到以下文件須要採集:" + Arrays.toString(listFiles)); // 將要採集的文件移動到待上傳臨時目錄 File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR)); for (File file : listFiles) { FileUtils.moveFileToDirectory(file, toUploadDir, true); } // 記錄日誌 logger.info("上述文件移動到了待上傳目錄" + toUploadDir.getAbsolutePath()); // 構造一個HDFS的客戶端對象 FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), "root"); File[] toUploadFiles = toUploadDir.listFiles(); // 檢查HDFS中的日期目錄是否存在,若是不存在,則建立 Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day); if (!fs.exists(hdfsDestPath)) { fs.mkdirs(hdfsDestPath); } // 檢查本地的備份目錄是否存在,若是不存在,則建立 File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + "/"); if (!backupDir.exists()) { backupDir.mkdirs(); } for (File file : toUploadFiles) { // 傳輸文件到HDFS並更名access_log_ Path destPath = new Path(hdfsDestPath + "/" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX)); fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath); // 記錄日誌 logger.info("文件傳輸到HDFS完成:" + file.getAbsolutePath() + "-->" + destPath); // 將傳輸完成的文件移動到備份目錄 FileUtils.moveFileToDirectory(file, backupDir, true); // 記錄日誌 logger.info("文件備份完成:" + file.getAbsolutePath() + "-->" + backupDir); } } catch (Exception e) { e.printStackTrace(); } } }
按期清理過期備份日誌
import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask; import org.apache.commons.io.FileUtils; public class BackupCleanTask extends TimerTask { @Override public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); long now = new Date().getTime(); try { // 探測本地備份目錄 File backupBaseDir = new File("d:/logs/backup/"); File[] dayBackDir = backupBaseDir.listFiles(); // 判斷備份日期子目錄是否已超24小時 for (File dir : dayBackDir) { long time = sdf.parse(dir.getName()).getTime(); if(now-time>24*60*60*1000L){ FileUtils.deleteDirectory(dir); } } } catch (Exception e) { e.printStackTrace(); } } }
配置信息提取到屬性配置文件中,並寫成常量,以單例設計模式去加載配置信息。
LOG_SOURCE_DIR=d:/logs/accesslog/ LOG_TOUPLOAD_DIR=d:/logs/toupload/ LOG_BACKUP_BASE_DIR=d:/logs/backup/ LOG_BACKUP_TIMEOUT=24 LOG_LEGAL_PREFIX=access.log. HDFS_URI=hdfs://hdp-01:9000/ HDFS_DEST_BASE_DIR=/logs/ HDFS_FILE_PREFIX=access_log_ HDFS_FILE_SUFFIX=.log
public class Constants { /** * 日誌源目錄參數key */ public static final String LOG_SOURCE_DIR = "LOG_SOURCE_DIR"; /** * 日誌待上傳目錄參數key */ public static final String LOG_TOUPLOAD_DIR = "LOG_TOUPLOAD_DIR"; public static final String LOG_BACKUP_BASE_DIR = "LOG_BACKUP_BASE_DIR"; public static final String LOG_BACKUP_TIMEOUT = "LOG_BACKUP_TIMEOUT"; public static final String LOG_LEGAL_PREFIX = "LOG_LEGAL_PREFIX"; public static final String HDFS_URI = "HDFS_URI"; public static final String HDFS_DEST_BASE_DIR = "HDFS_DEST_BASE_DIR"; public static final String HDFS_FILE_PREFIX = "HDFS_FILE_PREFIX"; public static final String HDFS_FILE_SUFFIX = "HDFS_FILE_SUFFIX"; }
import java.util.Properties; /** * 單例模式:懶漢式——考慮了線程安全 * @author ThinkPad * */ public class PropertyHolderLazy { private static Properties prop = null; public static Properties getProps() throws Exception { if (prop == null) { synchronized (PropertyHolderLazy.class) { if (prop == null) { prop = new Properties(); prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties")); } } } return prop; } }
import java.util.Properties; /** * 單例設計模式,方式一: 餓漢式單例 * @author ThinkPad * */ public class PropertyHolderHungery { private static Properties prop = new Properties(); static { try { prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties")); } catch (Exception e) { } } public static Properties getProps() throws Exception { return prop; } }
hdfs有服務端和客戶端;
服務端:
成員:namenode 管理元數據,datanode存儲數據
配置:須要指定使用的文件系統(默認的配置在core-default.xml,爲本地文件系統,須要修改服務器core-site.xml修改成hdfs文件系統,並指定namenode),namenode和datanode的工做目錄(服務器的默認配置在hdfs-default.xml中,默認是在/temp下,須要就該hdfs-site.xml來覆蓋默認值。);
細節:第一次啓動集羣時須要格式化namenode
客戶端:
形式:網頁端,命令行,java客戶端api;客戶端能夠運行在任何地方。
功能:指定上傳文件的信息報括切塊大小(hdfs-default.xml中默認值128m,能夠在hdfs-site.xml中修改,也能夠咋java api 中建立客戶端對象的時候指定,總之由客戶端來指定),副本數量(hdfs-default.xml中默認值3,一樣能夠修改覆蓋);完成hdfs中文件的系列操做,如上傳,下載
雖然服務端和客戶端的共用配置 core-default.xml core-site.xml;hdfs-default.xml hdfs-site.xml,可是不一樣的程序所須要的參數不一樣,只不過爲了方便,全部參數都寫在一個文件中了。便是在服務器的hdfs-site.xml中配置了切塊大小和副本數量,服務器的namenode和datanode根本不關心也不使用這些參數,只有啓動服務器上的命令行客戶端時,該參數纔可能起做用。