爲了知道客戶端與HDFS,NameNode,DataNode交互過程當中數據的流向,請看圖3-2,這張圖顯示了讀取文件過程當中主要的事件順序。
客戶端經過調用FileSystem對象的open()方法打開一個但願從中讀取數據的文件,對於HDFS來講,FileSystem是一個DistributedFileSystem的實例對象(圖3-2 步驟1)。DistributedFileSystem遠程調用名稱節點(NameNode)獲得文件開頭幾個塊的位置。對於每個塊,名稱節點返回包含這個塊複本的全部數據節點(DataNode)的地址。進一步,這些數據節點會根據集羣的網絡拓撲結構按照距離客戶端的遠近進行排序。若是客戶端自己是一個數據節點(例如一個MapReduce任務),而這個數據節點包含要讀取的塊的複本,則客戶端會直接從本地讀取。java
DistributedFileSystem返回一個FSDataInputStream對象給客戶端,用於從文件中讀取數據。FSDataInputStream是一個輸入流,支持文件尋位(seek)。FSDataInputStream裏包裝了一個DFSInputStream類,這個類支持數據節點和名稱節點的I/O操做。node
客戶端調用read()方法從流中讀取數據。DFSInputStream存儲了文件中開頭幾個塊所在的數據節點的地址。首先鏈接第一個塊所在的最近的數據節點,數據從數據節點被讀取到客戶端,而後不斷地從這個流中讀取(步驟4)直接這個塊數據被讀完,而後DFSInputStream將會關閉到這個數據節點的鏈接,尋找下一個塊所在的最近的數據節點(步驟5)。這一系列操做對客戶端來講是透明的,它不用管。從客戶端的角度來看,它僅僅是在讀取一個連續的數據流。web
塊按順序依次被讀取。當客戶端從數據流中讀數的時候,DFSInputStream依次創建和關閉和數據節點的鏈接。若是須要,DistributedFileSystem將再次調用名稱節點獲得下一批塊全部數據節點的位置。當客戶端完成了全部數據的讀取,它會調用FSDataInputStream的close()方法關閉流(步驟6)。編程
在讀取的過程當中,若是DFSInputStream在與數據節點交互的過程當中出現了錯誤,它將會嘗試當前塊所在的最近的下一個數據節點。它也會記住那些交互失敗的數據節點以便讀取其它塊時再也不在這些失敗的數據節點中讀取。DFSInputStream也會校驗從數據節點傳過來的數據,若是塊中數據損壞了,它將嘗試從另外一個包含這個塊複本的數據節點中讀取。它也會向名稱節點報告這個損壞的塊。緩存
這樣設計一個重要的方面是客戶端直接與數據節點交互,並經過名稱節點的引導,找到每個塊所在的最好的數據節點。這樣設計可讓HDFS響應大量同時併發請求的客戶端。由於數據分佈在集羣中全部的數據節點中。並且,名稱節點僅僅須要響應獲取塊全部位置的請求(這個位置信息存儲在內存中,因此很是高效)而不須要響應獲取文件數據的請求。若是名稱節點還響應讀取文件數據的請求,那麼隨着客戶端數據增多,很快會出現瓶頸。網絡
Hadoop網絡拓撲結構
本地網絡的兩個節點對彼此"關閉"是什麼意思呢?在大批量數據處理環境中,限制速度的因素是節點以前傳輸的速率,帶寬幾乎對速度沒有一點貢獻,因此能夠用節點間的帶寬作爲衡量節點間距離的尺碼。但在實踐中並不直接去測試兩個節點間的帶寬,由於這很困難。Hadoop採起了一個簡單的途徑,網絡以樹的形式表示,兩個節點的距離等於各自距離他們共同上層節點的距離之和。樹中的層級並非預先設定好的,一般層級中有數據中心,機架(Rack)和正在運行進程的節點。下面場景中帶寬依次遞減:併發
- 相同節點上的處理
- 同一機架不一樣節點上的處理
- 同一數據中心不一樣機架中節點上的處理
- 不一樣數據中心中節點上的處理
例如:節點n1,在機架r1上,機架在數據中心d1上。用/d1/r1/n1,以這爲列,來看看下面四個場景中節點間距離:- distance(/d1/r1/n1,/d1/r1/n1)=0(相同節點上的處理)
- distance(/d1/r1/n1,/d1/r1/n2)=2(相同機架上不一樣節點)
- distance(/d1/r1/n1,/d1/r2/n3)=4(相同數據中心不一樣節點)
- distance(/d1/r1/n1,/d2/r3/n4)=6(不一樣數據中心節點)
圖3-3更加形象顯示了上面示例:
最後,你要知道hadoop並不知道你的網絡拓撲圖,須要你進行配置。然而,默認的狀況下,hadoop會假設全部節點在同一數據中心中一機架上。對於小型集羣,確實是這種狀況,這樣的話,就不須要進行額外的配置。異步
下一步,咱們將看看數據怎麼寫入到HDFS中的。雖然這是很細節的東西,但它有助於理解HDFS模型如何保證數據一致。tcp
咱們考慮這一種狀況,在HDFS中建立一個新文件,寫入數據,而後關閉文件。如圖3-4所示:ide
客戶端經過調用DistributedFileSystem類的create()方法建立文件(圖3-4步驟1)。DistributedFileSystem遠程調用名稱節點在文件系統的名稱空間中建立一個新文件,沒有塊與這個新文件關聯(步驟2)。名稱節點作各類各樣的檢查確保文件以前沒有被建立過,並且客戶端有權限建立這個文件。若是檢查經過,名稱節點將會記錄這個新文件,不然將建立失敗,拋給客戶端一個IOException異常。若是成功建立,則DistributedFileSystem返回一個FSDataOutputStream對象給客戶端,以便客戶端寫入數據。正如讀數據那樣,FSDataOutputStream封閉了DFSOutputStream類,用此類來與數據節點與名稱節點交互。
當客戶端寫數據的時候(步驟3),DFSOutputStream首先將數據拆分紅多個包,寫入"數據隊列"中。而後,DataStreamer過來消費這個數據隊列,它會向名稱節點請求一些合適的新塊用於存儲複本數據。名稱節點會返回包含這些新塊的數據節點列表。這些數據節點造成了一個通道,這裏,咱們假設複製級別是3,因此在這個通道中有三個節點。DataStreamer首先向這個通道中第一個數據節點寫入以前被拆分的包數據。第一個數據節點寫完後,會前進到第二個數據節點,第二個數據節點存儲包數據後繼續前進到第三個也是最後一個數據節點(步驟4)。
DFSOutStream也會在內部維護一個"包隊列"。只有當某一個包被全部節點存儲後,這個包纔會從包隊列中刪除(步驟5)。
若是在數據寫入過程當中,任何一個數據節點寫入失敗了,那麼將麼執行以下操做(這些操做對客戶端來講是透明的)。首先,通道關閉,包隊列中的全部包都將會放到數據隊列前面。這樣,失敗數據節點的下游數據節點不會錯過任何一個包。在好的數據節點上的當前塊被給予一個新的身份標識,將它傳送給名稱節點,以便之後當失敗的數據節點恢復後,它上面已經保存的部分塊數據將會被刪除。失敗的數據節點從通道中移除,再基於剩下兩個好的數據節點創建一個新通道。數據塊中剩餘的數據寫到管道中剩下好的數據節點中。名稱節點知道這個塊還須要複製,因此它會把它複製到另一個節點中.餘下的塊照常處理。
雖然不太可能,但在寫入數據的時候仍有可能幾個數據節點同時失敗,只要dfs.namenode.replication.min複本數(默認是1)有值,就會寫入成功。塊將會在集羣中異步複製直到達到設定的複本複製數(dfs.replication默認是3)。
當客戶端寫入數據完成後,將會調用close()方法關閉流(步驟6)。這個方法將會清除數據節點通道中剩下的包,並等待全部包數據寫入完成,而後通知名稱節點,整個文件已經寫入完成(步驟7)。名稱節點知道這個文件由哪些塊組成(由於DataStreamer是向名稱節點請求獲得塊的位置的),因此它僅須要等待塊完成了最小複製就能夠成功返回了。
複本存儲
名稱節點是怎麼知道選擇哪些數據節點存儲複本呢?這是在綜合權衡了可靠性,寫入數據帶寬和讀取數據帶寬以後獲得的結果。例如:若是將全部複本放在一個節點上將會形成最小的寫入帶寬(由於複製通道運行在一個節點上),並且,這不是真正的冗餘,由於若是這個節點損壞了,塊數據就會丟失。可是讀數據的帶寬會很高。另外一種極端的狀況,將複本放在不一樣的數據中心,這樣或許能最大化冗餘度,可是卻很消耗帶寬。即便在相同的數據中心中,也會有不少種不一樣的存儲策略。
Hadoop默認的策略是將第一個複本存放在客戶機所在的節點中(對於運行在集羣外的客戶端來講,將會隨機選擇一個節點,系統儘可能不會選擇已經存儲很滿或工做太忙的節點)。第二個複本存儲時將會選擇與第一個節點不在同一個硬盤陣列的另一個機架,隨機選擇一個節點存儲。第三個複本將會放在與第二個節點相同的機架中,可是存儲在隨機選擇的另一個節點中。其它的複本將會存儲在集羣中隨機選擇的節點中,系統儘可能避免將太量複本放到相同的機架中。
一旦複本的存儲位置肯定了,就會創建一個通道,結合考慮hadoop的網絡拓撲結構以後進行數據的寫入。對於複本個數爲3的狀況,通道也許如圖3-5所示:
總之,這個策略在可靠性(塊被存儲在兩個機架中),寫入帶寬(寫數據時僅須要經過一個網絡交換機),讀取性能(能夠選擇兩個機架中任意一個讀取),塊的分佈性(客戶端僅在本地機架中寫入一個塊)這些因素之間作了比較好的權衡。
文件系統的一致性模型描述了讀取文件中的數據或向文件寫入數據的可見性。HDFS爲了性能犧牲了一些POSIX標準的要求,致使一些操做可能與你指望的不同。
在建立一個文件後,正如所指望的那樣,在文件系統名稱空間中看見了這個文件。
Path p=new Path("p"); fs.create(p); assertThat(fs.exists(p),is(true));
然而,任何寫入到這個文件的數據不必定可見,即便輸出流被flush刷新了。這個文件的長度仍爲0。
Path p=new Path("p"); OutputStream out=fs.create(p); out.write("content".getBytes("UTF-8")); out.flush(); assertThat(fs.getFileStatus(p).getLen(),is(0L));
一旦超過一個hadoop塊的數據寫入了,第一個塊將對讀取器可見。對於後續的塊也是如此。當前正在被寫入數據的塊老是對新來的讀取器不可見。
HDFS經過FSDataOutputStream的hflush()方法能夠強迫緩存中的數據flush進數據節點。在hflush()方法成功返回後,HDFS確保已經寫入文件的數據都存進了寫數據管道中的數據節點中,而且對新來的讀取器可見。
Path p=new Path("p"); FSDataOutputStream out=fs.create(p); out.write("content".getBytes("UTF-8")); out.hflush(); assertThat(fs.getFileStatus(p).getLen(),is((long)"contents".length()));
注意hflush()不能確保數據節點已經將數據寫入磁盤中,僅僅確保數據存儲在數據節點的內存中(因此若是數據中心斷電了,數據將會丟失)。若是須要確保數據能寫入磁盤,請使用hsync()。
hsync()方法內部的操做與POSIX標準中fsync()標準命令類似,都會提交緩存中的數據到磁盤。例如,使用標準的JAVA API將數據寫入本地文件,在flush數據流和同步數據到磁盤後,就能夠確保能看見已經寫入文件的內容。
FileOutputStream out=new FileOutputStream(localFile); out.write("contents".getBytes("UTF-8")); out.flush();//flush操做系統 out.getFD().sync();//同步進磁盤 assertThat(localFile.length(),is((long)"contents".length()));
關閉HDFS的文件流時內部也會執行hflush()方法。
Path p=new Path("p"); OutputStream out=fs.create(p); out.write("contents".getBytes("UTF-8")); out.close(); assertThat(fs.getFileStatus().getLen(),is((long)"contents".length()));
一致性模型已經蘊涵了設計應用的方法。若是不調用hflush()和hsync(),當客戶端或系統故障時,你將會丟失大量數據。對不少應用來講,這是不可接受的。因此你應該在合適的時機調用hflush(),例如在寫入至關一部分數據記錄或字節以後。雖然hflush()這個方法在設計時考慮到不對HDFS形成太大負擔,可是它確實對性能有一些影響(hsync()有更多影響)。因此在數據健壯性與傳輸率之間要有一個權衡。一個可接受的平衡點是當以不一樣頻率調用hflush(),並在考量應用性能前提下,那些依賴應用,合適的數據都能被讀取到時。
到目前爲止咱們看到的HDFS獲取數據的形式都是單線程的。例如,經過指定文件通配符的方法,咱們能夠同時操做大量文件。但要想有效地併發處理這些文件 ,你必須本身編程。Hadoop提供了一個有用的程序,叫作distcp,用於併發地將數據複製到hadoop或從Hadoop複製數據。
distcp其中的一個用途是替代hadoop fs -cp命令。例如,你能夠複製一個文件到另外一個文件中經過使用
% hadoop distcp file1 file2
你也能夠複製目錄:
% hadoop distcp dir1 dir2
若是目錄dir2不存在,hadoop將會建立它。而且目錄1中的內容將複製到目錄dir2中。你能夠指定多個源路徑,全部這些源路徑下的文件都將會複製到目的目錄中。
若是目錄dir2已經存在了,dir1目錄將複製到它下一級,建立目錄結構dir2/dir1。若是這不是你所想要的,你能夠經過使用-overwrite選項,將數據以覆蓋的形式複製到dir2目錄下。你也能夠只更新那些已經改變的文件,使用-update選項。咱們經過一個示例說明。若是咱們在目錄dir1中修改了一個文件,咱們將會使用以下命令將dir1目錄的修改同步進dir2中。
% hadoop distcp -update dir1 dir2
distcp使用MapReduce做業方式實現,在集羣中併發運行多個map來進行復制工做,沒有reducer。每個file使用一個map複製。Distcp粗略地將全部文件等分紅幾份,以便給每個map分配近似相等的數據量。默認狀況下,最多使用20個map。可是這個值能夠經過在distcp中指定-m參數改變。
使用distcp一個很是經常使用的用途是在兩個HDFS集羣間傳輸數據。例如,下面命令在第二個集羣中建立了第一個集羣/foo目錄下文件的備份。
% hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenod2/foo
-delete參數使用distcp刪除目的目錄下有而源目錄沒有的文件或目錄。-p參數意思是文件的狀態屬性像權限,塊大小和複本個數都保留。你能夠不帶任何參數運行distcp命令來查看參數的詳細使用說明。
若是這兩個集羣運行不一樣版本的HDFS,那麼你可使用webhdfs協議在兩個集羣間複製。
% hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo
另外一種變通的方法可使用HTTPFS代理作爲distcp的源或目的地(它也使用了webhdfs協議,能夠設置防火牆和控制帶寬,參看"HTTP章節")。
當將數據複製到HDFS中時,考慮集羣的平衡性很重要。當文件塊在集羣中均勻連續存儲時,HDFS可以表現地最好。因此你使用distcp時也要確保不打破這個規則。例如,若是你若是指定-m 1,將會有一個map進行復制工做,先不考慮這樣作效率很低,沒有充分有效地利用集羣資源,這樣作就意味着每個塊的第一個複本將位於運行map任務的節點上(直到磁盤滿了)。第二個和第三個複本將會在集羣其它節點上。可是這樣就達不到平衡,若是使集羣中map任務數比節點數多,就能夠避免這個問題。因此,當運行distcp命令時,最好使用默認的每個節點20個map任務。
然而,不可能一直保持集羣平衡。也許你想要限制map任務的個數,以便節點上資源可以被其它做業使用。這種狀況下,你可使用平衡工具(可參看"平衡器章節")使集羣中的塊分佈地更加均衡。
本文是筆者翻譯自《OReilly.Hadoop.The.Definitive.Guide.4th.Edition》第一部分第3章,後續將繼續翻譯其它章節。雖盡力翻譯,但奈何水平有限,錯誤再所不免,若是有問題,請不吝指出!但願本文對你有所幫助。
本文轉自個人簡書博客