以前工做中,有接觸到大數據的需求,雖然當時咱們體系有專門的大數據部門,可是因爲當時咱們中臺重構,整個體系的開發量巨大,共用一個大數據部門,人手已經忙不過來,無法辦,爲了趕時間,我本身負責的系統的大數據相關操做,由咱們本身承擔了。此前對大數據的知識瞭解的不多,因而晚上回去花時間突擊大數據知識,白天就開始上手幹,一邊學一邊作,總算在部門規定的時間,跟系統一塊兒上線了。後來的維護迭代就交給大數據去了,雖然接觸大數據的時間不長,可是對我來講,確是頗有意思的一段經歷,以爲把當時匆匆學的知識點,再仔細回顧回顧,整理下。html
大數據: 就是對海量數據進行分析處理,獲得一些有價值的信息,而後幫助企業作出判斷和決策.node
處理流程:程序員
1:獲取數據數據庫
2:處理數據apache
3:展現結果編程
Hadoop是一個分佈式系基礎框架,它容許使用簡單的編程模型跨大型計算機的大型數據集進行分佈式處理.它主要解決兩個問題vim
大數據存儲問題: HDFS數組
大數據計算問題:MapReduce緩存
假設一個文件很是很是大,大小爲1PB/a.txt, 大到世界上全部的高級計算機都存儲不下, 怎麼辦?安全
爲了保存大文件, 須要把文件放在多個機器上
文件要分塊 block(128M)
不一樣的塊放在不一樣的 HDFS 節點
同時爲了對外提供統一的訪問, 讓外部能夠像是訪問本機同樣訪問分佈式文件系統
有一個統一的 HDFS Master
它保存整個系統的文件信息
全部的文件元數據的修改都從 Master 開始
從一個網絡日誌文件中計算獨立 IP, 以及其出現的次數 若是數據量特別大,咱們能夠將,整個任務拆開, 劃分爲比較小的任務, 從而進行計算呢。
若是可以在不一樣的節點上並行執行, 更有更大的提高, 如何把這些任務跑在集羣中?
Hadoop分佈式文件系統(HDFS) 提供對應用程序數據的高吞吐量訪問的分佈式文件系統
Hadoop Common 其餘Hadoop模塊所需的Java庫和實用程序。這些庫提供文件系統和操做系統級抽象,幷包含啓動Hadoop所需的必要Java文件和腳本
Hadoop MapReduce 基於YARN的大型數據集並行處理系統
Hadoop YARN 做業調度和集羣資源管理的框架
Hadoop最先起源於Nutch。Nutch的設計目標是構建一個大型的全網搜索引擎,包括網頁抓取、索引、查詢等功能,但隨着抓取網頁數量的增長,遇到了嚴重的可擴展性問題——如何解決數十億網頁的存儲和索引問題。
2003年、2004年穀歌發表的兩篇論文爲該問題提供了可行的解決方案。
——分佈式文件系統(GFS),可用於處理海量網頁的存儲
——分佈式計算框架MAPREDUCE,可用於處理海量網頁的索引計算問題。
狹義上來講,hadoop就是單獨指代hadoop這個軟件,
HDFS :分佈式文件系統
MapReduce : 分佈式計算系統
廣義上來講,hadoop指代大數據的一個生態圈,包括不少其餘的軟件
1.x的版本架構模型介紹
文件系統核心模塊:
NameNode:集羣當中的主節點,管理元數據(文件的大小,文件的位置,文件的權限),主要用於管理集羣當中的各類數據
secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理
DataNode:集羣當中的從節點,主要用於存儲集羣當中的各類數據
數據計算核心模塊:
JobTracker:接收用戶的計算請求任務,並分配任務給從節點
TaskTracker:負責執行主節點JobTracker分配的任務
2.x的版本架構模型介紹
第一種:NameNode與ResourceManager單節點架構模型
文件系統核心模塊:
NameNode:集羣當中的主節點,主要用於管理集羣當中的各類數據
secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理
DataNode:集羣當中的從節點,主要用於存儲集羣當中的各類數據
數據計算核心模塊:
ResourceManager:接收用戶的計算請求任務,並負責集羣的資源分配
NodeManager:負責執行主節點APPmaster分配的任務
第二種:NameNode單節點與ResourceManager高可用架構模型
文件系統核心模塊:
NameNode:集羣當中的主節點,主要用於管理集羣當中的各類數據
secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理
DataNode:集羣當中的從節點,主要用於存儲集羣當中的各類數據
數據計算核心模塊:
ResourceManager:接收用戶的計算請求任務,並負責集羣的資源分配,以及計算任務的劃分,經過zookeeper實現ResourceManager的高可用
NodeManager:負責執行主節點ResourceManager分配的任務
第三種:NameNode高可用與ResourceManager單節點架構模型
文件系統核心模塊:
NameNode:集羣當中的主節點,主要用於管理集羣當中的各類數據,其中nameNode能夠有兩個,造成高可用狀態
DataNode:集羣當中的從節點,主要用於存儲集羣當中的各類數據
JournalNode:文件系統元數據信息管理
數據計算核心模塊:
ResourceManager:接收用戶的計算請求任務,並負責集羣的資源分配,以及計算任務的劃分
NodeManager:負責執行主節點ResourceManager分配的任務
第四種:NameNode與ResourceManager高可用架構模型
文件系統核心模塊:
NameNode:集羣當中的主節點,主要用於管理集羣當中的各類數據,通常都是使用兩個,實現HA高可用
JournalNode:元數據信息管理進程,通常都是奇數個
DataNode:從節點,用於數據的存儲
數據計算核心模塊:
ResourceManager:Yarn平臺的主節點,主要用於接收各類任務,經過兩個,構建成高可用
NodeManager:Yarn平臺的從節點,主要用於處理ResourceManager分配的任務
HDFS(Hadoop Distributed File System) 是一個 Apache Software Foundation 項目, 是 Apache Hadoop 項目的一個子項目. Hadoop 很是適於存儲大型數據 (好比 TB 和 PB), 其就是使用 HDFS 做爲存儲系統. HDFS 使用多臺計算機存儲文件, 而且提供統一的訪問接口, 像是訪問一個普通文件系統同樣使用分佈式文件系統. HDFS 對數據文件的訪問經過流的方式進行處理, 這意味着經過命令和 MapReduce 程序的方式能夠直接使用 HDFS. HDFS 是容錯的, 且提供對大數據集的高吞吐量訪問.
HDFS 的一個很是重要的特色就是一次寫入、屢次讀取, 該模型下降了對併發控制的要求, 簡化了數據聚合性, 支持高吞吐量訪問. 而吞吐量是大數據系統的一個很是重要的指標, 吞吐量高意味着能處理的數據量就大.
經過跨多個廉價計算機集羣分佈數據和處理來節約成本
經過自動維護多個數據副本和在故障發生時來實現可靠性
它們爲存儲和處理超大規模數據提供所需的擴展能力。
Doug Cutting 在作 Lucene 的時候, 須要編寫一個爬蟲服務, 這個爬蟲寫的並不順利, 遇到了一些問題, 諸如: 如何存儲大規模的數據, 如何保證集羣的可伸縮性, 如何動態容錯等
2013年的時候, Google 發佈了三篇論文, 被稱做爲三駕馬車, 其中有一篇叫作 GFS, 是描述了 Google 內部的一個叫作 GFS 的分佈式大規模文件系統, 具備強大的可伸縮性和容錯性
Doug Cutting 後來根據 GFS 的論文, 創造了一個新的文件系統, 叫作 HDFS
NameNode 是一箇中心服務器, 單一節點(簡化系統的設計和實現), 負責管理文件系統的名字空間(NameSpace)以及客戶端對文件的訪問
文件操做, NameNode 是負責文件元數據的操做, DataNode 負責處理文件內容的讀寫請求, 跟文件內容相關的數據流不通過 NameNode, 只詢問它跟哪一個 DataNode聯繫, 不然 NameNode 會成爲系統的瓶頸
副本存放在哪些 DataNode 上由 NameNode 來控制, 根據全局狀況做出塊放置決定, 讀取文件時 NameNode 儘可能讓用戶先讀取最近的副本, 下降讀取網絡開銷和讀取延時
NameNode 全權管理數據庫的複製, 它週期性的從集羣中的每一個DataNode 接收心跳信合和狀態報告, 接收到心跳信號意味着 DataNode 節點工做正常, 塊狀態報告包含了一個該 DataNode 上全部的數據列表
全部的文件都是以 block 塊的方式存放在 HDFS 文件系統當中, 在 Hadoop1 當中, 文件的 block 塊默認大小是64M, hadoop2 當中, 文件的 block 塊大小默認是 128M, block 塊的大小能夠經過 hdfs-site.xml 當中的配置文件進行指定
<property>
<name>dfs.block.size</name>
<value>塊大小 以字節爲單位</value>
</property>
複製代碼
(1)引入塊機制的好處
一個文件有可能大於集羣中任意一個磁盤
使用塊抽象而不是文件能夠簡化存儲子系統
塊很是適合用於數據備份進而提供數據容錯能力和可用性
(2)塊緩存
一般 DataNode 從磁盤中讀取塊, 但對於訪問頻繁的文件, 其對應的塊可能被顯式的緩存在 DataNode 的內存中, 以堆外塊緩存的形式存在. 默認狀況下,一個塊僅緩存在一個 DataNode 的內存中,固然能夠針對每一個文件配置 DataNode 的數量. 做業調度器經過在緩存塊的 DataNode 上運行任務, 能夠利用塊緩存的優點提升讀操做的性能.
例如:
鏈接(join) 操做中使用的一個小的查詢表就是塊緩存的一個很好的候選
用戶或應用經過在緩存池中增長一個 Cache Directive 來告訴 NameNode 須要緩存哪些文件及存多久. 緩存池(Cache Pool) 是一個擁有管理緩存權限和資源使用的管理性分組.
例如一個文件 130M, 會被切分紅 2 個 block 塊, 保存在兩個 block 塊裏面, 實際佔用磁盤 130M 空間, 而不是佔用256M的磁盤空間
(3)HDFS 文件權限驗證
HDFS 的文件權限機制與 Linux 系統的文件權限機制相似
r:read w:write x:execute
複製代碼
權限 x 對於文件表示忽略, 對於文件夾表示是否有權限訪問其內容 若是 Linux 系統用戶 zhangsan 使用 Hadoop 命令建立一個文件, 那麼這個文件在 HDFS 當中的 Owner 就是 zhangsan HDFS 文件權限的目的, 防止好人作錯事, 而不是阻止壞人作壞事. HDFS相信你告訴我你是誰, 你就是誰
當 Hadoop 的集羣當中, 只有一個 NameNode 的時候, 全部的元數據信息都保存在了 FsImage 與 Eidts 文件當中, 這兩個文件就記錄了全部的數據的元數據信息, 元數據信息的保存目錄配置在了 hdfs-site.xml 當中
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///export/servers/hadoop-3.1.1/datas/namenode/namenodedatas</value>
</property>
<property>
<name>dfs.namenode.edits.dir</name>
<value>file:///export/servers/hadoop-3.1.1/datas/dfs/nn/edits</value>
</property>
複製代碼
(1) FsImage 和 Edits 詳解
editsedits 存放了客戶端最近一段時間的操做日誌客戶端對 HDFS 進行寫文件時會首先被記錄在 edits 文件中edits 修改時元數據也會更新每次 HDFS 更新時 edits 先更新後客戶端纔會看到最新信息
fsimageNameNode 中關於元數據的鏡像, 通常稱爲檢查點, fsimage 存放了一份比較完整的元數據信息由於 fsimage 是 NameNode 的完整的鏡像, 若是每次都加載到內存生成樹狀拓撲結構,這是很是耗內存和CPU, 因此通常開始時對 NameNode 的操做都放在 edits 中fsimage 內容包含了 NameNode 管理下的全部 DataNode 文件及文件 block 及 block 所在的 DataNode 的元數據信息.隨着 edits 內容增大, 就須要在必定時間點和 fsimage 合併
(2)fsimage 中的文件信息查看
官方查看文檔
使用命令 hdfs oiv
cd /export/servers/hadoop-3.1.1/datas/namenode/namenodedatas
hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml
複製代碼
(3) edits 中的文件信息查看
官方查看文檔
使用命令 hdfs oev
cd /export/servers/hadoop-3.1.1/datas/dfs/nn/edits
hdfs oev -i edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML
複製代碼
(4) SecondaryNameNode 如何輔助管理 fsimage 與 edits 文件?
SecondaryNameNode 按期合併 fsimage 和 edits, 把 edits 控制在一個範圍內
配置 SecondaryNameNodeSecondaryNameNode 在 conf/masters 中指定在 masters 指定的機器上, 修改 hdfs-site.xml dfs.http.address host:50070 修改 core-site.xml, 這一步不作配置保持默認也能夠 fs.checkpoint.period 3600 fs.checkpoint.size 67108864
SecondaryNameNode 通知 NameNode 切換 editlog
SecondaryNameNode 從 NameNode 中得到 fsimage 和 editlog(經過http方式)
SecondaryNameNode 將 fsimage 載入內存, 而後開始合併 editlog, 合併以後成爲新的 fsimage
SecondaryNameNode 將新的 fsimage 發回給 NameNode
NameNode 用新的 fsimage 替換舊的 fsimage
(5)特色
完成合並的是 SecondaryNameNode, 會請求 NameNode 中止使用 edits, 暫時將新寫操做放入一個新的文件中 edits.new
SecondaryNameNode 從 NameNode 中經過 Http GET 得到 edits, 由於要和 fsimage 合併, 因此也是經過 Http Get 的方式把 fsimage 加載到內存, 而後逐一執行具體對文件系統的操做, 與 fsimage 合併, 生成新的 fsimage, 而後經過 Http POST 的方式把 fsimage 發送給 NameNode. NameNode 從 SecondaryNameNode 得到了 fsimage 後會把原有的 fsimage 替換爲新的 fsimage, 把 edits.new 變成 edits. 同時會更新 fstime
Hadoop 進入安全模式時須要管理員使用 dfsadmin 的 save namespace 來建立新的檢查點
SecondaryNameNode 在合併 edits 和 fsimage 時須要消耗的內存和 NameNode 差很少, 因此通常把 NameNode 和 SecondaryNameNode 放在不一樣的機器上
Client 發起文件上傳請求, 經過 RPC 與 NameNode 創建通信, NameNode 檢查目標文件是否已存在, 父目錄是否存在, 返回是否能夠上傳
Client 請求第一個 block 該傳輸到哪些 DataNode 服務器上
NameNode 根據配置文件中指定的備份數量及機架感知原理進行文件分配, 返回可用的 DataNode 的地址如: A, B, C
Hadoop 在設計時考慮到數據的安全與高效, 數據文件默認在 HDFS 上存放三份, 存儲策略爲本地一份, 同機架內其它某一節點上一份, 不一樣機架的某一節點上一份。
Client 請求 3 臺 DataNode 中的一臺 A 上傳數據(本質上是一個 RPC 調用,創建 pipeline ), A 收到請求會繼續調用 B, 而後 B 調用 C, 將整個 pipeline 創建完成, 後逐級返回 client
Client 開始往 A 上傳第一個 block(先從磁盤讀取數據放到一個本地內存緩存), 以 packet 爲單位(默認64K), A 收到一個 packet 就會傳給 B, B 傳給 C. A 每傳一個 packet 會放入一個應答隊列等待應答
數據被分割成一個個 packet 數據包在 pipeline 上依次傳輸, 在 pipeline 反方向上, 逐個發送 ack(命令正確應答), 最終由 pipeline 中第一個 DataNode 節點 A 將 pipelineack 發送給 Client
當一個 block 傳輸完成以後, Client 再次請求 NameNode 上傳第二個 block 到服務 1
Client向NameNode發起RPC請求,來肯定請求文件block所在的位置;
NameNode會視狀況返回文件的部分或者所有block列表,對於每一個block,NameNode 都會返回含有該 block 副本的 DataNode 地址; 這些返回的 DN 地址,會按照集羣拓撲結構得出 DataNode 與客戶端的距離,而後進行排序,排序兩個規則:網絡拓撲結構中距離 Client 近的排靠前;心跳機制中超時彙報的 DN 狀態爲 STALE,這樣的排靠後;
Client 選取排序靠前的 DataNode 來讀取 block,若是客戶端自己就是DataNode,那麼將從本地直接獲取數據(短路讀取特性);
底層上本質是創建 Socket Stream(FSDataInputStream),重複的調用父類 DataInputStream 的 read 方法,直到這個塊上的數據讀取完畢;
當讀完列表的 block 後,若文件讀取尚未結束,客戶端會繼續向NameNode 獲取下一批的 block 列表;
讀取完一個 block 都會進行 checksum 驗證,若是讀取 DataNode 時出現錯誤,客戶端會通知 NameNode,而後再從下一個擁有該 block 副本的DataNode 繼續讀。
read 方法是並行的讀取 block 信息,不是一塊一塊的讀取;NameNode 只是返回Client請求包含塊的DataNode地址,並非返回請求塊的數據;
最終讀取來全部的 block 會合併成一個完整的最終文件。
(1)導入 Maven 依賴
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
複製代碼
(2)概述
在 Java 中操做 HDFS, 主要涉及如下 Class:
Configuration該類的對象封轉了客戶端或者服務器的配置
FileSystem該類的對象是一個文件系統對象, 能夠用該對象的一些方法來對文件進行操做, 經過 FileSystem 的靜態方法 get 得到該對象FileSystem fs = FileSystem.get(conf) get 方法從 conf 中的一個參數 fs.defaultFS 的配置值判斷具體是什麼類型的文件系統若是咱們的代碼中沒有指定 fs.defaultFS, 而且工程 ClassPath 下也沒有給定相應的配置, conf 中的默認值就來自於 Hadoop 的 Jar 包中的 core-default.xml默認值爲 file:///, 則獲取的不是一個 DistributedFileSystem 的實例, 而是一個本地文件系統的客戶端對象
(3)獲取 FileSystem 的幾種方式
第一種方式
@Test public void getFileSystem() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), configuration); System.out.println(fileSystem.toString()); }
第二種方式
@Test public void getFileSystem2() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020"); FileSystem fileSystem = FileSystem.get(new URI("/"), configuration); System.out.println(fileSystem.toString()); }
第三種方式
@Test public void getFileSystem3() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://192.168.52.250:8020"), configuration); System.out.println(fileSystem.toString()); }
第四種方式
@Test public void getFileSystem4() throws Exception{ Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020"); FileSystem fileSystem = FileSystem.newInstance(configuration); System.out.println(fileSystem.toString()); }
(4)遍歷 HDFS 中全部文件
遞歸遍歷
@Test public void listFile() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.100:8020"), new Configuration()); FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/")); for (FileStatus fileStatus : fileStatuses) { if(fileStatus.isDirectory()){ Path path = fileStatus.getPath(); listAllFiles(fileSystem,path); }else{ System.out.println("文件路徑爲"+fileStatus.getPath().toString());
}
}
複製代碼
} public void listAllFiles(FileSystem fileSystem,Path path) throws Exception{ FileStatus[] fileStatuses = fileSystem.listStatus(path); for (FileStatus fileStatus : fileStatuses) { if(fileStatus.isDirectory()){ listAllFiles(fileSystem,fileStatus.getPath()); }else{ Path path1 = fileStatus.getPath(); System.out.println("文件路徑爲"+path1); } } }
使用 API 遍歷
@Test public void listMyFiles()throws Exception{ //獲取fileSystem類 FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); //獲取RemoteIterator 獲得全部的文件或者文件夾,第一個參數指定遍歷的路徑,第二個參數表示是否要遞歸遍歷 RemoteIterator locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true); while (locatedFileStatusRemoteIterator.hasNext()){ LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); System.out.println(next.getPath().toString()); } fileSystem.close(); }
(5)下載文件到本地
@Test
public void getFileToLocal()throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
FSDataInputStream open = fileSystem.open(new Path("/test/input/install.log"));
FileOutputStream fileOutputStream = new FileOutputStream(new File("c:\\install.log"));
IOUtils.copy(open,fileOutputStream );
IOUtils.closeQuietly(open);
IOUtils.closeQuietly(fileOutputStream);
fileSystem.close();
}
複製代碼
(6)HDFS 上建立文件夾
@Test
public void mkdirs() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test"));
fileSystem.close();
}
複製代碼
(7)HDFS 文件上傳
@Test
public void putData() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
fileSystem.copyFromLocalFile(new Path("file:///c:\\install.log"),new Path("/hello/mydir/test"));
fileSystem.close();
}
複製代碼
(8)僞造用戶
中止hdfs集羣,在node01機器上執行如下命令
cd /export/servers/hadoop-3.1.1 sbin/stop-dfs.sh
修改node01機器上的hdfs-site.xml當中的配置文件
cd /export/servers/hadoop-3.1.1/etc/hadoop vim hdfs-site.xml dfs.permissions.enabled true
修改完成以後配置文件發送到其餘機器上面去
scp hdfs-site.xml node02: PWD
重啓hdfs集羣
cd /export/servers/hadoop-3.1.1 sbin/start-dfs.sh
隨意上傳一些文件到咱們hadoop集羣當中準備測試使用
cd /export/servers/hadoop-3.1.1/etc/hadoop hdfs dfs -mkdir /config hdfs dfs -put *.xml /config hdfs dfs -chmod 600 /config/core-site.xml
使用代碼準備下載文件
@Test public void getConfig()throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop"); fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///c:/core-site.xml")); fileSystem.close(); }
(9)小文件合併
因爲 Hadoop 擅長存儲大文件,由於大文件的元數據信息比較少,若是 Hadoop 集羣當中有大量的小文件,那麼每一個小文件都須要維護一份元數據信息,會大大的增長集羣管理元數據的內存壓力,因此在實際工做當中,若是有必要必定要將小文件合併成大文件進行一塊兒處理
在咱們的 HDFS 的 Shell 命令模式下,能夠經過命令行將不少的 hdfs 文件合併成一個大文件下載到本地
cd /export/servers
hdfs dfs -getmerge /config/*.xml ./hello.xml
複製代碼
既然能夠在下載的時候將這些小文件合併成一個大文件一塊兒下載,那麼確定就能夠在上傳的時候將小文件合併到一個大文件裏面去
@Test
public void mergeFile() throws Exception{
//獲取分佈式文件系統
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop");
FSDataOutputStream outputStream = fileSystem.create(new Path("/bigfile.xml"));
//獲取本地文件系統
LocalFileSystem local = FileSystem.getLocal(new Configuration());
//經過本地文件系統獲取文件列表,爲一個集合
FileStatus[] fileStatuses = local.listStatus(new Path("file:///F:\\傳智播客大數據離線階段課程資料\\三、大數據離線第三天\\上傳小文件合併"));
for (FileStatus fileStatus : fileStatuses) {
FSDataInputStream inputStream = local.open(fileStatus.getPath());
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
}
IOUtils.closeQuietly(outputStream);
local.close();
fileSystem.close();
}
複製代碼
MapReduce思想在生活中到處可見。或多或少都曾接觸過這種思想。MapReduce的思想核心是「分而治之」,適用於大量複雜的任務處理場景(大規模數據處理場景)。
Map負責「分」,即把複雜的任務分解爲若干個「簡單的任務」來並行處理。能夠進行拆分的前提是這些小任務能夠並行計算,彼此間幾乎沒有依賴關係。 Reduce負責「合」,即對map階段的結果進行全局彙總。 MapReduce運行在yarn集羣
ResourceManager
NodeManager
這兩個階段合起來正是MapReduce思想的體現。
MapReduce是一個分佈式運算程序的編程框架,核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在Hadoop集羣上。
Hadoop MapReduce構思: 分而治之 對相互間不具備計算依賴關係的大數據,實現並行最天然的辦法就是採起分而治之的策略。並行計算的第一個重要問題是如何劃分計算任務或者計算數據以便對劃分的子任務或數據塊同時進行計算。不可分拆的計算任務或相互間有依賴關係的數據沒法進行並行計算! 統一構架,隱藏系統層細節 如何提供統一的計算框架,若是沒有統一封裝底層細節,那麼程序員則須要考慮諸如數據存儲、劃分、分發、結果收集、錯誤恢復等諸多細節;爲此,MapReduce設計並提供了統一的計算框架,爲程序員隱藏了絕大多數系統 層面的處理細節。 MapReduce最大的亮點在於經過抽象模型和計算框架把須要作什麼(what need to do)與具體怎麼作(how to do)分開了,爲程序員提供一個抽象和高層的編程接口和框架。程序員僅須要關心其應用層的具體計算問題,僅需編寫少許的處理應用自己計算問題的程序代碼。如何具體完成這個並行計算任務所相關的諸多系統層細節被隱藏起來,交給計算框架去處理:從分佈代碼的執行,到大到數千小到單個節點集羣的自動調度使用。 構建抽象模型:Map和Reduce MapReduce借鑑了函數式語言中的思想,用Map和Reduce兩個函數提供了高層的並行編程抽象模型 Map: 對一組數據元素進行某種重複式的處理; Reduce: 對Map的中間結果進行某種進一步的結果整理。 Map和Reduce爲程序員提供了一個清晰的操做接口抽象描述。MapReduce 處理的數據類型是鍵值對。 MapReduce中定義了以下的Map和Reduce兩個抽象的編程接口,由用戶去編程實現: Map: (k1; v1) → [(k2; v2)] Reduce: (k2; [v2]) → [(k3; v3)]
MapReduce 框架結構 一個完整的mapreduce程序在分佈式運行時有三類實例進程:
MRAppMaster 負責整個程序的過程調度及狀態協調
MapTask 負責map階段的整個數據處理流程
ReduceTask 負責reduce階段的整個數據處理流程
MapReduce 的開發一共有八個步驟, 其中 Map 階段分爲 2 個步驟,Shuffle 階段 4個步驟,Reduce 階段分爲 2 個步驟
Map 階段 2 個步驟
設置 InputFormat 類, 將數據切分爲 Key-Value(K1和V1) 對, 輸入到第二步
自定義 Map 邏輯, 將第一步的結果轉換成另外的 Key-Value(K2和V2) 對, 輸出結果
Shuffle 階段 4 個步驟
對輸出的 Key-Value 對進行分區
對不一樣分區的數據按照相同的 Key 排序
(可選) 對分組過的數據初步規約, 下降數據的網絡拷貝
對數據進行分組, 相同 Key 的 Value 放入一個集合中
Reduce 階段 2 個步驟
對多個 Map 任務的結果進行排序以及合併, 編寫 Reduce 函數實現本身的邏輯, 對輸入的 Key-Value 進行處理, 轉爲新的 Key-Value(K3和V3)輸出
設置 OutputFormat 處理並保存 Reduce 輸出的 Key-Value 數據
轉換爲代碼,例子以下
Map階段
public class WordCountMapper extends Mapper<Text,Text,Text, LongWritable> {
/**
* K1-----V1
* A -----A
* B -----B
* C -----C
*
* K2-----V2
* A -----1
* B -----1
* C -----1
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,new LongWritable(1));
}
}
複製代碼
Reduce階段
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0L;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
複製代碼
shuffle階段,舉一個分區的例子:
public class WordCountPartitioner extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
if (text.toString().length() > 5) {
return 1;
}
return 0;
}
}
複製代碼
主方法
public class JobMain extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(),new JobMain(),args);
}
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "wordcout");
job.setJarByClass(JobMain.class);
//輸入
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/"));
//map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//shuffle階段
job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);
//reduce階段
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//輸出
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("/"));
return 0;
}
}
複製代碼
具體步驟:
讀取數據組件 InputFormat(默認 TextInputFormat) 會經過 getSplits 方法對輸入目錄中文件進行邏輯切片規劃獲得 splits, 有多少個 split 就對應啓動多少個MapTask . split 與 block 的對應關係默認是一對一
將輸入文件切分爲 splits 以後, 由 RecordReader 對象 (默認是LineRecordReader)進行讀取, 以 \n 做爲分隔符, 讀取一行數據, 返回 <key,value> . Key 表示每行首字符偏移值,Value 表示這一行文本內容
讀取 split 返回 <key,value> , 進入用戶本身繼承的 Mapper 類中,執行用戶重寫的 map 函數, RecordReader 讀取一行這裏調用一次
Mapper 邏輯結束以後, 將 Mapper 的每條結果經過 context.write 進行collect數據收集. 在 collect 中, 會先對其進行分區處理,默認使用 HashPartitioner。
環形緩衝區實際上是一個數組, 數組中存放着 Key, Value 的序列化數據和 Key,Value 的元數據信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及Value 的長度. 環形結構是一個抽象概念
緩衝區是有大小限制, 默認是 100MB. 當 Mapper 的輸出結果不少時, 就可能會撐爆內存, 因此須要在必定條件下將緩衝區中的數據臨時寫入磁盤, 而後從新利用這塊緩衝區. 這個從內存往磁盤寫數據的過程被稱爲 Spill, 中文可譯爲溢寫. 這個溢寫是由單獨線程來完成, 不影響往緩衝區寫 Mapper 結果的線程.溢寫線程啓動時不該該阻止 Mapper 的結果輸出, 因此整個緩衝區有個溢寫的比例 spill.percent . 這個比例默認是 0.8, 也就是當緩衝區的數據已經達到閾值 buffer size * spill percent = 100MB * 0.8 = 80MB , 溢寫線程啓動,鎖定這 80MB 的內存, 執行溢寫過程. Mapper 的輸出結果還能夠往剩下的20MB 內存中寫, 互不影響
當溢寫線程啓動後, 須要對這 80MB 空間內的 Key 作排序 (Sort). 排序是 MapReduce模型默認的行爲, 這裏的排序也是對序列化的字節作的排序若是 Job 設置過 Combiner, 那麼如今就是使用 Combiner 的時候了. 將有相同 Key 的 Key/Value 對的 Value 加起來, 減小溢寫到磁盤的數據量.Combiner 會優化 MapReduce 的中間結果, 因此它在整個模型中會屢次使用那哪些場景才能使用 Combiner 呢? 從這裏分析, Combiner 的輸出是Reducer 的輸入, Combiner 毫不能改變最終的計算結果. Combiner 只應該用於那種 Reduce 的輸入 Key/Value 與輸出 Key/Value 類型徹底一致, 且不影響最終結果的場景. 好比累加, 最大值等. Combiner 的使用必定得慎重, 若是用好, 它對 Job 執行效率有幫助, 反之會影響 Reducer 的最終結果
合併溢寫文件, 每次溢寫會在磁盤上生成一個臨時文件 (寫以前判斷是否有 Combiner),若是 Mapper 的輸出結果然的很大, 有屢次這樣的溢寫發生, 磁盤上相應的就會有多個臨時文件存在. 當整個數據處理結束以後開始對磁盤中的臨時文件進行 Merge 合併, 由於最終的文件只有一個, 寫入磁盤, 而且爲這個文件提供了一個索引文件, 以記錄每一個reduce對應數據的偏移量
Reduce 大體分爲 copy、sort、reduce 三個階段,重點在前兩個階段。copy 階段包含一個 eventFetcher 來獲取已完成的 map 列表,由 Fetcher 線程去 copy 數據,在此過程當中會啓動兩個 merge 線程,分別爲 inMemoryMerger 和 onDiskMerger,分別將內存中的數據 merge 到磁盤和將磁盤中的數據進行 merge。待數據 copy 完成以後,copy 階段就完成了,開始進行 sort 階段,sort 階段主要是執行 finalMerge 操做,純粹的 sort 階段,完成以後就是 reduce 階段,調用用戶定義的 reduce 函數進行處理
詳細步驟:
Copy階段 ,簡單地拉取數據。Reduce進程啓動一些數據copy線程(Fetcher),經過HTTP方式請求maptask獲取屬於本身的文件。
Merge階段 。這裏的merge如map端的merge動做,只是數組中存放的是不一樣map端copy來的數值。Copy過來的數據會先放入內存緩衝區中,這裏的緩衝區大小要比map端的更爲靈活。merge有三種形式:內存到內存;內存到磁盤;磁盤到磁盤。默認狀況下第一種形式不啓用。當內存中的數據量到達必定閾值,就啓動內存到磁盤的merge。與map 端相似,這也是溢寫的過程,這個過程當中若是你設置有Combiner,也是會啓用的,而後在磁盤中生成了衆多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,而後啓動第三種磁盤到磁盤的merge方式生成最終的文件。
合併排序 。把分散的數據合併成一個大的數據後,還會再對合並後的數據排序。
對排序後的鍵值對調用reduce方法 ,鍵相等的鍵值對調用一次reduce方法,每次調用會產生零個或者多個鍵值對,最後把這些輸出的鍵值對寫入到HDFS文件中。
map 階段處理的數據如何傳遞給 reduce 階段,是 MapReduce 框架中最關鍵的一個流程,這個流程就叫 shuffle shuffle: 洗牌、發牌 ——(核心機制:數據分區,排序,分組,規約,合併等過程)
Collect階段 :將 MapTask 的結果輸出到默認大小爲 100M 的環形緩衝區,保存的是key/value,Partition 分區信息等。
Spill階段 :當內存中的數據量達到必定的閥值的時候,就會將數據寫入本地磁盤,在將數據寫入磁盤以前須要對數據進行一次排序的操做,若是配置了 combiner,還會將有相同分區號和 key 的數據進行排序。
Merge階段 :把全部溢出的臨時文件進行一次合併操做,以確保一個 MapTask 最終只產生一箇中間數據文件。
Copy階段 :ReduceTask 啓動 Fetcher 線程到已經完成 MapTask 的節點上覆制一份屬於本身的數據,這些數據默認會保存在內存的緩衝區中,當內存的緩衝區達到必定的閥值的時候,就會將數據寫到磁盤之上。
Merge階段 :在 ReduceTask 遠程複製數據的同時,會在後臺開啓兩個線程對內存到本地的數據文件進行合併操做。
Sort階段 :在對數據進行合併的同時,會進行排序操做,因爲 MapTask 階段已經對數據進行了局部的排序,ReduceTask 只需保證 Copy 的數據的最終總體有效性便可。Shuffle 中的緩衝區大小會影響到 mapreduce 程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快
緩衝區的大小能夠經過參數調整, 參數:mapreduce.task.io.sort.mb 默認100M