階段一補充:讀《Hadoop權威指南》

1、記錄我在書中看到的有趣的觀點

  1. Hadoop和傳統關係型數據庫的比較(P8 1.5 相對於其餘系統的優點)
  • 爲何不能經過給傳統關係型數據庫配備大容量硬盤的方式來進行大規模的數據分析?

  這個問題的答案來自於計算機硬盤的發展趨勢:尋址時間遠遠比不上傳輸速率的提高。尋址是將磁頭移動到特定的硬盤位置進行讀或者寫操做,而傳輸速率取決於硬盤的帶寬。而尋址就是致使磁盤操做延遲的主要緣由。   當數據量很小時,傳統的數據庫依靠B樹(一種數據結構,主要用於關係型數據庫索引),能夠實現快速的讀取和更新;可是當數據量很大時,由於須要不少的「排序/合併」操做,傳統的數據庫系統就明顯落後於MapReduce了。   java

  • 2007年1月,數據庫理論專家Dewitt和Stonebreaker發表《MapReduce:一個歷史性的大倒退》,他們這麼作的理由是什麼?

  簡而言之,他們認爲:1. MapReduce放棄了 不少那些通過歷代數據庫專家優化提出的高性能數據庫技術,好比批量導入、索引、視圖、更新、事物等;2. MapReduce是一個粗糙的實現,它沒有索引,依靠蠻力做爲處理選項;3. MapReduce並不稀奇,以前就已經有人在使用類似的概念而且作出產品來了。   node

  • Hadoop和關係型數據庫的另外一個區別在於他們所操做的數據集的結構化程度。

  結構化數據是指具備既定格式的實體化數據,好比XML文檔或者知足特定預約義格式的數據表。這是RDBMS(關係型數據庫)包含的技術。   半結構化數據比較鬆散,它可能有格式,可是常常被忽略,因此通常只能做爲對數據結構的通常性指導。好比Excel電子表格,它在結構上是單元格組成的網格,可是你能夠在單元格中保存任何格式的數據。   非結構化數據沒有什麼特別的內部結構,例如純文本或者圖像數據。   Hadoop對半結構化和非結構化數據很是有效,由於它是在處理數據時纔對數據進行解釋(所謂的「讀時模式」)。這種模式在提供靈活的同時,避免了RDBMS在數據加載階段帶來的高開銷,由於這在Hadoop中僅僅是一個簡單的文件拷貝操做。(這也是爲何Hadoop能夠進行高速流式讀/寫的緣由)   關係型數據每每是規範的,,這主要是爲了數據的完整性且不含冗餘。可是規範性會給Hadoop處理帶來麻煩,由於它使記錄變成非本地操做,而Hadoop的核心假設之一恰恰就是能夠進行(高速的)流讀/寫操做。(暫時還沒理解這句話的意思)而WEB日誌就是典型的非規範化數據,例如每次都要記錄客戶端主機的名字和IP,這會致使同一個客戶端的全名可能出現屢次。   MapReduce以及Hadoop中其餘的處理模型都是能夠隨着數據規模現行伸縮的(可擴展性)。對數據分區後,函數原語(如map程序和reduce程序)可以在各個分區上面並行工做。這意味着,若是輸入的數據量是原來的兩倍,那麼做業的運行時間也須要兩倍。可是若是集羣的規模擴展爲原來的兩倍,那麼做業的速度依然能夠變得和原來同樣快。(分數不夠,拿錢來湊...)可是SQL查詢不具有這樣的特性。 2. 高性能計算和Hadoop計算web

  • 什麼是高性能計算(High Performance Computing,HPC)?

  高性能計算相關的組織多年以來一直研究大規模的數據處理,主要使用相似於消息傳遞接口MPI的相關API。   從廣義上講,高性能計算採用的方法是將做業分散到集羣的各臺機器上,這些機器訪問存儲區域網絡(SAN)組成的共享文件系統。這比較適合計算密集型的做業。可是若是節點須要訪問的數據量更龐大,不少計算節點就會由於網絡帶寬的瓶頸而不得不閒下來等數據。   而對於Hadoop而言,它儘可能在計算節點上存儲數據,以實現數據的本地快速訪問。這在Hadoop中稱爲數據本地化。意識到網絡帶寬是數據中心環境最珍貴的資源(處處複製數據很容易耗盡網絡帶寬)以後,Hadoop採用顯式網絡拓撲結構來保留網絡帶寬(將網絡當作一棵樹,兩個節點之間的距離使他們到共同的祖先的距離綜合)。 3. MapReduce數據流(P31 2.4 橫向擴展)   首先定義一些術語:MapReduce做業(Job)是客戶端須要執行的一個工做單元,它包括輸入數據、MapReduce程序和配置信息。而Hadoop將做業分紅若干任務(task)來執行,任務有兩類:map任務和reduce任務。這些任務運行在集羣的節點上,並經過Yarn調度。若是一個任務失敗,它Yarn將在另一個不一樣的節點上自動從新調度運行。Hadoop將MapReduce的輸入數據劃分紅等長的數據塊,稱爲「輸入分片」,簡稱「分片」。   算法

  • 分片後的數據塊的大小爲何不能超過HDFS的塊大小?

  這個數據塊的大小應該不能超過HDFS的塊大小,緣由是:1. 數據本地化。Hadoop在存儲有HDFS數據的節點上運行Map任務能夠得到最佳性能,由於它無需使用寶貴的集羣帶寬資源;若是在本地找不到,Yarn就會調度同一個機架上的空閒slot來運行該任務,此時屬於同機架內不一樣節點間調度,有時候還會跨機架調度,可是這種狀況幾乎見不到。2. 若是分片後的數據跨過了兩個HDFS塊,因爲Hadoop的容災機制,任何一個節點都不可能同時存儲這兩個數據塊,所以不免會跨節點甚至是跨機架,效率會變得很低。   shell

  • map任務將其輸出結果寫入到本地硬盤,而不是HDFS,爲何?

  由於map的輸出是中間結果,這個中間結果還要通過reduce處理後才能產生最終結果。並且一旦map做業成功完成,這個中間結果就會被刪除;一旦執行失敗,就會被從新調度運行。若是把這個中間結果存儲在HDFS上並實現備份,未免有點小題大作。   數據庫

  • reduce相關,以及如何肯定reduce的數量?      reduce的輸出一般存儲在HDFS中以實現可靠存儲。具體作法是:對於reduce輸出的每個HDFS塊,第一個副本存儲在本地節點上,其餘副本處於可靠性存儲在其餘機架上的節點中。   單個reduce任務的輸入一般來自於集羣中全部mapper的輸出;若是有好多個reduce任務,那麼每個map任務就會針對map的輸出進行分區(partition),一個map鍵對應一個reduce分區。以後進行shuffle(混洗)操做,將不一樣的map輸出按照key分發到不一樣的reduce中去,結果是每個reduce中的鍵值都是相同的。最後reduce會對輸入的相同的鍵值進行merge操做,最後再輸出以持久化。   
  • 什麼是combiner函數?

  集羣的帶寬資源很寶貴,所以儘可能避免map任務和reduce任務之間的數據傳輸是有必要的。Hadoop容許用戶針對map的輸出指定一個combiner(就像map同樣),combiner的輸入來自map,輸出做爲reduce的輸入。combiner屬於優化方案,因此有沒有combiner、或者調用多少次combiner,reduce輸出的結果都應該是同樣的。   簡單理解,combiner至關於對map的輸出結果進行一個相似於reduce的補充運算,以此來減小mapper和reducer之間的數據傳輸量。舉一個例子,咱們的目的是統計1950年全國最高氣溫,第一個map的輸出爲((1950,0),(1950,10),(1950,20)),第二個map的輸出爲((1950,25),(1950,20)),沒有combiner的時候,reduce的輸入爲(1950,[0,10,20,25,20]),最後的輸出爲(1950,25)。當咱們使用combiner以後,第一個map的輸出爲(1950,20),第二個map的輸出爲(1950,25),那麼reduce的輸入爲(1950,[20,25]),最後仍是輸出(1950,25)。更簡單的說,咱們能夠經過下面的表達式來講明這個計算過程:   沒有使用combiner:max(0,10,20,25,20)=25。   沒有使用combiner:max(max(0,10,20),max(25,20))=max(20,25)=25。 4. 使用本身喜歡的語言實現MapReduce程序(P37 2.5 HadoopStreaming)apache

  • 其餘語言如何調用Hadoop接口?

  Hadoop提供了MapReduce的API,容許編程人員使用其餘的非Java語言來寫本身的map函數和reduce函數。Hadoop Streaming 使用 Unix標準輸入輸出流做爲Hadoop和應用程序之間的接口,因此,任何能進行Unix輸入與輸出的編程語言都能實現MapReduce程序。   map函數的輸入來自標準輸入,輸出結果寫到標準輸出。map輸出的鍵-值對是以一個製表符(\t)分割的行,而且是通過對鍵排序的。   reduce的輸入來自於標準輸入,輸出結果寫到標準輸出(Hadoop能夠接受此標準輸出,並將其持久化)。reduce的輸入與map的輸出相同。 5. HDFS(P42 3.1 HDFS的設計)編程

  • datanode和namenode   namenode管理着文件系統的命名空間。它維護者文件系統樹以及整棵樹內的全部文件和目錄。namenode 的本地磁盤上永久保存着:命名空間鏡像文件編輯日誌文件。namenode的內存中保存着文件系統的元數據。   沒有namenode,文件系統將沒法使用。若是運行namenode的機器損壞,文件系統上的全部文件都將丟失,由於咱們不知道如何根據datanode上的數據塊來重建完整的文件。爲此,咱們須要進行容錯操做——Hadoop HA(高可用)。   在HA模式中,配置了一對活動-備用namenode(active-standby namenode),當處於active的namenode出現故障,standby namenode會接管它的任務並開始服務來自於客戶端的請求,不會有任何明顯的中斷。具體實現爲:1. namenode之間經過高可用實現編輯日誌的共享;2. datanode須要同時向兩個namenode發送數據塊處理報告,這是由於數據塊的映射信息存儲在內存中而不是本次磁盤中。   
  • HDFS文件寫入的時候,須要進行存儲的datanode經過管道鏈接起來,爲何Hadoop接口要在確認收到全部數據包的確認消息後纔會將確認隊列刪除?datanode的副本該怎麼存放?

  剛開始我對這塊也很好奇:爲何該接口不在確認存入第一個節點以後就返回結果,以後節點之間採用異步的方式將副本同步?後面我看到了這句話:只要寫入了dfs.namenode.replication.min的副本數(默認爲1),寫操做就會成功,而且這個塊能夠在集羣中異步複製,直到達到其目標副本數(dfs.replication的默認值爲3)。   namenode選擇在哪一個datanode上存儲副本,要從可靠性、寫入帶寬、讀取帶寬之間進行權衡。通常的作法是:第一個副本放在運行客戶端的節點上(若是運行客戶端的節點不是datanode,那麼就隨機選擇一個不那麼忙也不那麼滿的節點);第二個節點存放在與第一個節點不一樣的機架上的任意一個節點;第三個節點存放在與第二個節點相同機架的另外一任意節點。 6. HDFS的I/O操做(P96 5.1)bootstrap

  • HDFS如何保證數據的完整性?   經過對寫入的全部數據計算校驗和,並在讀取的時候驗證校驗和

  寫數據時:由DataNode負責對所要存儲的數據的驗證操做。寫數據的客戶端將數據以及校驗和發送給一系列DataNode組成的管線,管線中的最後一個DataNode負責校驗操做。若是最後一個DataNode檢測到錯誤,客戶端就會收到一個IOException。      讀數據時:由客戶端進行驗證操做。每個DataNode都會保存一個用於驗證的校驗和日誌,裏面記錄每個數據塊的最後一個驗證時間。客戶端收到數據以後進行驗證,若是成功,則修改該日誌;若是失敗,會執行下面的操做:1. 向NameDode彙報出錯的數據塊block以及這個數據塊所在的DataNode,同時拋出ChecksumException異常;2. NameNode將這個block標記爲已損壞,這樣它就不會再將一樣的客戶端請求發送到這個節點,同時嘗試將這個block的一個副本複製到另外的DataNode,使得數據塊的副本因子回到指望值;3. 將這個已經損壞的block刪除。    - 序列化操做?bash

  序列化是指將結構化對象轉換爲字節流以便在網絡上傳輸或者寫入到磁盤中進行永久存儲的過程。反序列化是指將字節流轉回結構化對象的逆過程。   序列化在分佈式數據處理有兩大應用場景:進程間通訊永久存儲。   在Hadoop生態中,系統中多個節點上的進程之間的通訊是經過**RPC(Remote Produce Call,遠程過程調用)**實現的。RPC協議將消息序列化爲二進制流以後發送到遠端節點,遠端節點接收到二進制流以後將其反序列化爲原始消息。一般狀況下,RPC序列化有如下特色:緊湊、快速、可擴展、支持互操做。Hadoop編程中使用Writable接口實現序列化。

2、一些細節

(一)、map

  1. map的階段有哪些?
  • read階段:經過用戶編寫的RecordReader,從輸入的InputSplit中解析出一個個key/value。
  • map階段:將解析出的kay/value交給用戶編寫的map函數處理,併產生新的key/value。
  • collect階段:map階段處理完成後,會調用OutputCollect.collect()輸出結果。在這個函數內部, 會將生成的key/value分片(調用partitioner),並將結果寫入一個環形緩衝區中。
  • spill階段:緩衝區滿後,map會將緩衝區中的文件刷寫到本地磁盤,生成一個臨時文件。須要注意的是,刷寫以前,會先對數據進行一次本地排序(快排),並在必要的時候對數據進行合併(combiner)和壓縮(IFile)。
  • merge階段:map對全部的臨時文件進行一個合併,確保最終一個map會輸出一個數據文件。
  1. mapper的數量由哪些因素決定?

  具體由輸入文件數目輸入文件大小配置的參數決定的。   首先了解配置參數:

mapreduce.input.fileinputformat.split.minsize // 啓動map的最小split size ,默認爲0
mapreduce.input.fileinputformat.split.maxsize // 啓動map的最大split size ,默認爲256
dfs.block.size  // Hadoop系統中的block塊大小,默認爲128M
splitsize = Math.max(minsize, Math.min(maxSize,blockSize))
複製代碼

  例如:默認狀況下,一個輸入文件800M,那麼mapper的數量應該爲7個,其中6個大小爲128M,1個大小爲32M;   再例如:一個目錄下有三個文件,大小分別爲5M、10M和150M,那麼這個時候會產生四個mapper,它們所處理的數據大小分別爲5M、10M、128M和22M;   固然咱們也能自定義mapper的數量。好比使上面的mapper數量變成2,一個處理大小爲128M、另外一個處理大小爲37M(5 + 10 + 22),具體實現能夠經過設置具體的參數。 2. 環形緩衝區細節:

  map task的任務輸出首先會進入到一個緩衝區內,這個緩衝區默認大小爲100M,當緩衝區達到容量的80%的時候,一個單獨的守護進程——spill(溢寫)進程會將緩衝區的內容溢寫(spill)到本地磁盤。溢寫是一個單獨的進程,不會影響map端的繼續輸出,可是當溢寫的過程當中寫入速度過快致使緩衝區滿,那麼map的寫操做就會被阻塞,直到溢寫完成。    3. partition細節:

  首先要知道的是,默認狀況下,無論有多少個map任務,一個reduce任務只會產生一個輸出文件。可是有時候咱們須要將最終的輸出數據分散到不一樣的文件中去,好比按照省份劃分,將同一個省份的數據寫入同一個文件中,最終有多個文件。而最終的數據來源於reduce,也就是說,若是要獲得多個文件,意味着須要一樣數量的reducer運行。而reduce的數據輸入來自於map,也就是說,咱們要實現多個reduce,得根據map的不一樣輸出作手腳,將不一樣的map輸出按照自定義規則分配給不一樣的reduce任務。而map任務劃分數據的過程稱爲partition。   partition就是提早對輸入進行處理,根據自定義的reduce進行分區,到了reduce處理的時候,只須要處理對應的分區數據就好了。   默認的partition方法以下所示:

public int getPartition(k Key, v Value, int numReduceTasks){
   	 return (key.hashCode() & Inter.MAX_VALUE) % numReduceTasks;
}
複製代碼

  前面括號內的結果表示將key的hash值變成一個非負值。numReduceTasks指的是reducer的數量,默認值是1。由於任何一個非負整數除以1的結果是0,也就是說,在默認狀況下,getPartition方法的返回值老是1。也就是說,mapper的任務輸出老是送給一個reducer,最終只能輸出到一個文件中去。這裏須要注意的是,若是numReduceTasks的數量爲0,那麼map會將結果直接刷寫到HDFS上去做進一步處理。   固然咱們能夠重載此方法,進而實現自定義的partition。   對map輸出的每個鍵值對,系統都會經過這個方法給定一個partition。若是一個鍵值對的partition值爲0,那麼它將會交給第1個reducer去處理,partition值爲1,會交給第二個reducer去處理。    4. sort和spill細節:

  當溢寫線程啓動的時候,須要對緩衝區中80%的數據作排序。這裏的排序是針對序列化的字節根據鍵值作排序,使用Hadoop本身定義的排序算法,具體實現細節爲快速排序法(先按照分區編號partition進行排序,再按照key進行排序),這樣的結果是,數據以分區彙集在一塊兒,而且同一個分區中的數據按照key有序。   溢寫的時候,按照分區編號由小到大依次將每一個分區中的數據寫入到任務工做目錄的臨時文件output/spillN.out(N表示當前溢寫的次數)中。   若是設置了combiner,那麼這個時候也會進行combiner操做。combiner會將形同key的key/value對的value按照設定的操做,好比加起來,減小溢寫到磁盤的數據量。combiner會優化map的輸出結果,可是不能對reduce的輸出結果產生影響,在這個前提下,combiner是個好東西,會在模型中屢次使用。   在優化的時候,爲了儘可能減小對磁盤文件的I/O操做,能夠在這一步對spill的文件進行壓縮,使其編程IFile格式,這也將有利於後面的merge操做,由於merge的時候須要根據spill後的文件大小進行排序操做。    5. merge細節:

  每一次的spill都會產生一個spill文件,因此map task計算的時候會不斷產生不少的spill文件,在map task結束以前會對這些spit文件進行合併造成一個已分區而且排序的輸出文件(能夠控制一次可以合併多少流,默認是10)0,最終的文件也只有一個,這個過程就是merge。 讓每個map task最終只生成一個數據文件,能夠避免同時打開大量文件金額同時讀取大量小文件產生的隨機讀取帶來的開銷。   每個map task都有一個緩衝區,存儲着map的輸出結果,當緩衝區快滿的時候須要將緩衝區的數據以臨時文件的形式存放到磁盤,當整個map task結束後再對磁盤中這個map task產生的全部臨時文件作合併,生成最終的正式輸出文件,而後等待reduce task來拉取數據。   merge採用多輪遞歸合併的方式,首先根據全部文件的大小創建小根堆,而後選取前十個元素,依次迭代讀取spill下來的文件中的key-value,並將生成的文件又放回到原來的文件中,以後再次創建小根堆,再執行上面的操做。因此merge的過程能夠當作建堆=>選取前十個元素迭代合併=>再建堆=>再合併……

(二)、shuffle

  shuffle要作的事是怎麼把map task的輸出結果有效地傳送到reduce端,或者能夠這樣理解,shuffle描述着數據從map task到reduce task輸入的這段過程。   在map處理階段結束後,會將多個文件合併成一個文件,這個文件中相同分區的數據放在一塊兒,同一個分區中的數據按照key有序。在reduce階段須要從多個map task中獲取屬於該reduce的分區的結果值,而後根據獲取到的文件大小決定放在內存中仍是刷寫到磁盤中,也就是說,從每個map中獲取的數據是一段內存或者一個文件,而後對內存和文件進行和map階段類似的merge操做,將結果中的每一行key/value執行reduce函數。   

enter image description here
  如上圖所示Map階段須要對多個文件的數據合併,在Reduce階段Reduce Task 1向各個Map獲取分區1的數據,Reduce Task 2向各個Map獲取分區2的數據,Reduce Task 3向各個Map獲取分區3的數據,圖中爲了簡便,Reduce Task3向map獲取數據的指向沒有標識。   注意:reduce從map獲取的是一個out文件中對應的partition數據,而不是整個out文件!!!

(三)、reduce

  1. reduce階段:
  • shuffle階段:也稱爲copy階段。reduce task從map task上遠程拷貝本身的partition,若是這片數據大小超過必定閾值,那就寫到磁盤上,不然直接放到內存中 。因此下面的merge操做會有內存到內存(不使用)、內存到磁盤、磁盤到磁盤的操做。
  • merge階段:根據MapReduce的定義,用戶編寫的reduce()函數的輸入數據是按照key進行聚合的一組數據。爲了將key相同的數據聚合在一塊兒,Hadoop採用了基於排序的merge策略。因爲每個map task的結果已是基於key局部有序的,所以,reduce task進行merge的時候只進行一次歸併排序便可。
  • reduce階段:reduce task將每一組數據依次交給用戶編寫的reduce()函數處理(對每個鍵調用reduce()函數)。
  • write階段:reduce()函數將計算結果持久化到HDFS上。

(四)、經過一個例子來了解MapReduce過程

  咱們的目的是統計兩個文件中每一個單詞出現的總次數。   首先建立兩個文件,做爲咱們的輸入:

file 1:
His name is Tom
Tom comes from Yunge

file 2:
His name is Jerry 
Jerry comes from Lingge
複製代碼
第一步,map

  map,映射,也就是拆解的意思。   咱們的輸入是兩個文件,在默認狀況下,會產生兩個split,也就是兩個mapper:mapper1和mapper2。   接下來,這兩個mapper會分別將文件內容分解爲單詞和1(注意,這裏的1不是具體數量,只是數字1),其中單詞是咱們的主鍵也就是key,後面的數字就是對應的value。   那麼每個mapper對應的輸出爲:    mapper1:

His        1
name        1
is        1
Tom        1
Tom        1
comes        1
from        1
Yunge        1
複製代碼

mapper2:

His        1
name        1
is        1
Jerry        1
Jerry        1
comes        1
from        1
Lingge        1
複製代碼
第二步,partition

  partition,分區。爲何要分區?由於後面會有多個reducer,每個reducer只幹本身的事,這回=會讓效率提高很多。partition就是提早對輸入進行處理,根據自定義的reduce進行分區,到了reduce處理的時候,只須要處理對應的分區數據就好了。   那麼如何分區呢?主要依據就是按照key將數據按照reduce分紅對應數量的組,就像彙總硬幣同樣,一元的進入1號桶,5角的進入2號桶,一角的進入三號桶。這個很重要的一點是須要保證key的惟一性,所以最多見的方法就是使用hash函數。這裏咱們假設有兩個reducer,咱們將首字母對應的字母順序進行除2取模,所以每個mapper進行partition以後的結果以下:

mapper1:

partition 1:
	His        1
	is        1
	comes        1
	from        1
	
partition 2:
	name        1
	Tom        1
	Tom        1
	Yunge        1
複製代碼

mapper2:

partition 1:
	His        1
	is        1
	Jerry        1
	Jerry        1
	comes        1
	from        1
	Lingge        1

partition 2:
	name        1
複製代碼

  其中partition 1是給reducer 1處理的,partition 2是給reducer 2處理的。   能夠看到,partition只是按照key進行了簡單的分區,並無任何別的處理,而且每個分區中的key不會出如今另外一個分區裏面。

第三步,sort

  sort,排序。由於後面的reducer也會作排序,可是它只是作一個歸併排序,要求每個mapper的輸出結果也是基於key有序的。這裏咱們根據首字母進行字典排序:

mapper1:

partition 1:
	comes        1
	from        1
	His        1
	is        1
	
partition 2:
	name        1
	Tom        1
	Tom        1
	Yunge        1
複製代碼

mapper2:

partition 1:
	comes        1
	from        1
	His        1
	is        1
	Jerry        1
	Jerry        1
	Lingge        1

partition 2:
	name        1
複製代碼

  能夠看到,每個partition中的數據都按照key作了排序。

第四步,combine

  combine能夠理解成一個mini reducer,它發生在spill到本次磁盤過程以前,目的就是把送到reducer的數據實現進行一次計算,以減小文件大小、減小對網絡帶寬的消耗。可是要注意的是,combine操做是可選的,若是要加上,請務必保證通過combine以後的數據不會對最終的reduce結果產生影響。下面咱們執行combine:

mapper1:

partition 1:
	comes        1
	from        1
	His        1
	is        1
	
partition 2:
	name        1
	Tom        2
	Yunge        1
複製代碼

mapper2:

partition 1:
	comes        1
	from        1
	His        1
	is        1
	Jerry        2
	Lingge        1

partition 2:
	name        1
複製代碼

  由於最後reducer執行的操做是add,那麼提早add和後面add的效果是同樣的,所以這個combiner是有效的。能夠看到,結果中,對重複的單詞進行了簡單的彙總。

第五步,copy

  copy,也叫shuffle,就是reducer本身從mapper拉去數據。每個reducer只拉取屬於本身partition的數據,結果以下:

reducer 1:

partition 1:(來自mapper1)
	comes        1
	from        1
	His        1
	is        1

partition 1:(來自mapper2)
	comes        1
	from        1
	His        1
	is        1
	Jerry        2
	Lingge        1
複製代碼

reducer 2:

partition 2:(來自mapper1)
	name        1
	Tom        2
	Yunge        1


partition 2:(來自mapper2)
	name        1
複製代碼

  能夠看到,經過shuffle操做,相同partition的數據落到了同一個節點(reducer)上。

第六步,merge

  merge,合併。將reducer獲得的文件合併成同一個文件,須要注意的是,這個過程也包含了排序。結果以下:

reducer 1:

comes        1
comes        1
from        1
from        1
His        1
His        1
is        1
is        1
Jerry        2
Lingge        1
複製代碼

reducer2:

partition 2:
	name        1
	name        1
	Tom        2
	Yunge        1
複製代碼
第七步,reduce

  reduce,歸併。最終的一步,將相同的key的value加1。結果以下圖:

reducer 1:

comes        2
from        2
His        2
is        2
Jerry        2
Lingge        1
複製代碼

reducer2:

partition 2:
	name        1
	Tom        2
	Yunge        1
複製代碼

  大功告成!咱們統計了兩個文件中每個單詞的數目,由於有兩個reducer,所以輸出結果會有兩個文件,即part-000000和part-00001。

(五)、Yarn資源調度框架

資源調度過程分解:

  1. 客戶端向Yarn提交應用並請求得到一個ApplicationMaster實例;
  2. ApplicationManager找到一個能夠運行Container的NodeManager,並在這個Container中啓動一個ApplicationMaster實例;
  3. ApplicationMaster向ApplicationManager註冊,註冊以後客戶端就能夠經過查詢ResourceManager從而得到本身提交應用的ApplicationMaster的詳細信息,以後client就能夠和ApplicationMaster直接通訊了;
  4. ApplicationMaster向ResourceManager申請執行這次任務所須要的資源,ResourceManager擁有集羣中全部NodeManager的Container(各個節點的資源)的信息,而後經過Scheduler進行調度;
  5. Scheduler將具體某個NodeManager上的Container的使用權交給這個ApplicationMaster;
  6. 一旦ApplicationMaster申請到資源以後,便會與該NodeManager通訊,要求啓動相應的Container執行任務;
  7. NodeManager爲任務設置好運行環境(包括環境變量、JAR包、二進制程序等),以後將任務命令寫入腳本,並經過腳本啓動相應的Container執行任務;
  8. 每個任務經過RPC協議向ApplicationMaster彙報本身的狀態和任務執行進度,以便讓ApplicationMaster掌握每個子任務的執行狀態,一方面能夠在任務失敗以後從新啓動任務,另外一方面方便客戶端經過ApplicationMaster掌握任務的執行狀況;
  9. 應用程序的任務完成後,ApplicationMaster向ApplicationManager註銷而且關閉本身,將用到的全部Container也一併歸還系統。
每一個組件功能詳解:
  1. Yarn簡介:   Yarn(Yet Another Resource Negotiator)被稱爲新一代Hadoop集羣的資源調度系統。Hadoop2.0對MapReduce框架進行了完全的設計與重構,在2.0版本中,它將1.0中的JobTracker拆分紅了兩個獨立的服務:一個全局資源管理器ResourceManager和一個針對局部應用的ApplicationMaster。前者負責整個系統的資源調配和管理,後者只針對某一個應用程序的資源管理。   在Hadoop2.0中,MapReduce中的一個Job變成了application,由於在hadoop2.0中,須要進行資源調度的應用不止MapReduce,還有Hadoop生態中的其餘應用,好比storm、hive、Hbase、spark等,這些須要使用Hadoop的底層HDFS應用都須要進行統一的管理,也就是說,經過Yarn,各個應用就能夠互不干擾的運行在同一個Hadoop系統中,共享整個集羣資源。   須要注意的是,2.0中的MapReduce和第一代的MapReduce的編程接口、數據處理引擎(map task和reduce task)是徹底同樣的,能夠認爲MRv2重用了MRv1中的這些模塊,不一樣的是資源管理和做業調度系統。MRv1中資源調度和做業管理均由JobTracker實現,集兩個功能於一身;而在MRv2中,將這兩部分分開了,其中做業管理由ApplicationMaster實現,而資源管理由新增的系統Yarn(ResourceManager)完成。因爲Yarn具備通用性,所以Yarn也能夠做爲其餘計算框架的資源管理系統,好比spark、storm。   Yarn整體上仍舊採用「主/從」架構,在整個集羣中,ResourceManager爲master,NodeManager爲slave,ResourceManager負責對整個NodeManager上的資源進行統一的管理和調度。
  2. ResourceManager:   ResourceManager是一個全局的資源管理器,負責對整個系統的資源進行管理和分配,它主要包含兩個組件:Scheduler(調度器)和ApplicationManager(應用程序管理器)。 -- ApplicationManager:   負責管理整個系統中的全部應用程序實例,包括在應用程序提交以後與Scheduler協商啓動ApplicationMaster、監控ApplicationMaster的運行狀態並在失敗時候重啓它等等; -- Scheduler:   它是一個純調度器,它只執行調度任務,不參與任何與具體應用程序相關的工做。它僅僅根據ApplicationMaster的具體需求進行集羣資源的分配。此外,Scheduler是一個可插拔的組件,用戶能夠自定義。
  3. ApplicationMaster:   用戶沒提交一個應用程序,ApplicationManager會找到一個能夠運行Container的NodeManager,並在這個Container中啓動一個ApplicationMaster實例,以後這個實例會向ApplicationManager進行註冊,將本身的更多信息告訴ApplicationManager,以便客戶端能和ApplicationMaster直接通訊得到任務運行的更多信息。ApplicationMaster會根據實際的任務需求向ResourceManager申請運行任務所須要的資源,以後Scheduler會根據請求分配具體的NodeManager上的Container給它管理。任務運行完成後,ApplicationMaster會向ApplicationManager申請註銷並關閉本身,同時歸還所佔用的全部系統資源。
  4. NodeManager:   NodeManager運行在集羣中的slave節點上,每個節點都會有本身的NodeManager。它是一個slave服務:負責接收ResourceManager的資源分配命令,將本身的某個具體Container分配給ApplicationMaster,管理每個本身的Container的生命週期。同時它還負責監控並報告本身Container中的狀態信息給ResourceManager,達到資源分配和監控的目的。
  5. Container:   Container是Yarn框架中具體的計算單元,執行具體的計算任務。一個Container就是一組待分配的系統資源,這組資源包括:CPU、內存。由於一個Container指的是某個節點上具體的計算資源,那麼Container中一定含有該計算資源的位置信息:位於哪一個機架上的哪一個NodeManager上面,因此咱們請求某一個Container的時候,其實是直接請求該臺機器上的CPU和內存資源。在Yarn框架中,一個application必須運行在一個或者多個Container之中,ResourceManager只負責告訴ApplicationMaster哪一個NodeManager上面的Container可使用,ApplicationMaster還得本身去NodeManager上去請求具體的Container。

(六)、寫一個Wordcount的MapReduce Java程序

0. 預覽被統計的文件格式

1. 新建maven項目,在pom.xml文件中添加下面的依賴和插件

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <configuration>
                    <excludeTransitive>false</excludeTransitive>
                    <stripVersion>true</stripVersion>
                    <outputDirectory>./lib</outputDirectory>
                </configuration>
            </plugin>
        </plugins>
    </build>
複製代碼

2. 新建mapper內部類

public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        @java.lang.Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String words = value.toString();  // 傳進來一行數據,先將其轉換成string
            String[] wordsArr = words.split(",");  // 由於是csv文件,因此使用","做爲列分隔符
            context.write(new Text(wordsArr[0]), new LongWritable(1));  // 以省份做爲鍵值,value固定爲1
            context.write(new Text(wordsArr[0] + "," + wordsArr[1]), new LongWritable(1));  // 以(省份,市)做爲鍵值,value固定爲1
        }
    }
複製代碼

2. 新建reducer內部類

public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        /** * 輸入格式 : <word,[1,1,1,1]> */
        Long sum = 0L;
        for (LongWritable value : values
        ) {
            sum += value.get();  // 實現value的累加
        }
        context.write(key, new LongWritable(sum));  // 將結果輸出
    }
}
複製代碼

3. 實現主方法

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    // 1. 建立一個job
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word-count");

    // 2. 將類名打包成Jar包
    job.setJarByClass(WordCount.class);

    // 3. 輸入文件地址
    FileInputFormat.addInputPath(job, new Path(args[0]));

    // 4. mapper處理邏輯
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);

    // 5. reducer處理邏輯
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    // 6. shuffle過程
    // 暫不處理

    // 7. 定義輸出地址
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 8. 運行結果顯示,若是成功就輸出"成功",不然輸出"失敗"
    boolean result = job.waitForCompletion(true);
    System.out.println(result ? "成功": "失敗");
}
複製代碼

4. 整個WordCount.java完成代碼:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {

    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        LongWritable one = new LongWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String words = value.toString();
            String[] wordArr = words.split(",");  // 將每一行數據拆分紅每一個單詞
			// context.write(new Text(wordArr[0]), one); // 統計某個省的學校數量
            context.write(new Text(wordArr[0] + "," + wordArr[1]), one);  // 統計某個市的學校數量
        }
    }

    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            /** * 輸入格式 : <word,[1,1,1,1]> */
            Long sum = 0L;
            for (LongWritable value : values
            ) {
                sum += value.get();  // 實現value的累加
            }
            context.write(key, new LongWritable(sum));  // 將結果輸出
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1. 建立一個job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word-count");

        // 2. 將類名打包成Jar包
        job.setJarByClass(WordCount.class);

        // 3. 輸入文件地址
        FileInputFormat.addInputPath(job, new Path(args[0]));

        // 4. mapper處理邏輯
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 5. reducer處理邏輯
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 6. shuffle過程
        // 暫不處理

        // 7. 定義輸出地址
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 8. 運行結果顯示,若是成功就輸出"成功",不然輸出"失敗"
        boolean result = job.waitForCompletion(true);
        System.out.println(result ? 1 : 0);
    }
}
複製代碼

5. 將代碼打包成jar包

  • 將以Extra開頭的依賴都刪除

  • 點擊build->build artifacts->build,填寫信息,就能獲得包裝好的jar包
  • 將此jar包上傳到集羣,執行如下命令:
​​
yarn jar ProvinceCount.jar WordCount hdfs://master:8020/test_data/senior_school_name.csv hdfs://master:8020/test_data/province_out/
# ProvinceCount.jar爲jar包名稱,WordCount爲主類,hdfs://master:8020/test_data/senior_school_name.csv爲輸入文件(這裏是HDFS上的文件),hdfs://master:8020/test_data/province_out/是結果輸出文件夾
複製代碼

6. 執行結果: 你能夠經過UI界面查看執行狀況:

job執行過程的日誌輸出以下圖所示:

[root@master hadoop-2.9.2]# yarn jar ProvinceCount.jar WordCount hdfs://master:8020/test_data/senior_school_name.csv hdfs://master:8020/test_data/province_out/
18/12/25 04:33:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/25 04:33:37 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.111.132:8032
18/12/25 04:33:37 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/12/25 04:33:43 INFO input.FileInputFormat: Total input files to process : 1
18/12/25 04:33:45 INFO mapreduce.JobSubmitter: number of splits:1
18/12/25 04:33:45 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/12/25 04:33:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1545718642445_0006
18/12/25 04:33:47 INFO impl.YarnClientImpl: Submitted application application_1545718642445_0006
18/12/25 04:33:47 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1545718642445_0006/
18/12/25 04:33:47 INFO mapreduce.Job: Running job: job_1545718642445_0006
18/12/25 04:34:42 INFO mapreduce.Job: Job job_1545718642445_0006 running in uber mode : false
18/12/25 04:34:42 INFO mapreduce.Job:  map 0% reduce 0%
18/12/25 04:36:12 INFO mapreduce.Job:  map 100% reduce 0%
18/12/25 04:36:21 INFO mapreduce.Job:  map 100% reduce 100%
18/12/25 04:36:22 INFO mapreduce.Job: Job job_1545718642445_0006 completed successfully
18/12/25 04:36:22 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=660893
		FILE: Number of bytes written=1719115
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1536880
		HDFS: Number of bytes written=362
		HDFS: Number of read operations=6
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=84094
		Total time spent by all reduces in occupied slots (ms)=6551
		Total time spent by all map tasks (ms)=84094
		Total time spent by all reduce tasks (ms)=6551
		Total vcore-milliseconds taken by all map tasks=84094
		Total vcore-milliseconds taken by all reduce tasks=6551
		Total megabyte-milliseconds taken by all map tasks=86112256
		Total megabyte-milliseconds taken by all reduce tasks=6708224
	Map-Reduce Framework
		Map input records=38356
		Map output records=38356
		Map output bytes=584175
		Map output materialized bytes=660893
		Input split bytes=116
		Combine input records=0
		Combine output records=0
		Reduce input groups=31
		Reduce shuffle bytes=660893
		Reduce input records=38356
		Reduce output records=31
		Spilled Records=76712
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=1346
		CPU time spent (ms)=12980
		Physical memory (bytes) snapshot=332660736
		Virtual memory (bytes) snapshot=4213764096
		Total committed heap usage (bytes)=137498624
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1536764
	File Output Format Counters 
		Bytes Written=362
成功
[root@master hadoop-2.9.2]# 
複製代碼

查看HDFS文件系統中是否有文件輸出:

查看文件內容:

# 在集羣shell中輸入:
hdfs dfs -text /test_data/province_out/part-r-00000
# 結果以下:
複製代碼

(七)、NameNode和SecondaryNameNode細節:

NameNode:
  1. NameNode概述:   NameNode維護了文件和數據塊之間的映射關係數據塊和數據節點之間的映射關係,即一個文件由哪些數據塊組成、這些數據塊存放在哪些節點上。真正的數據都存放在DataNode上。   NameNode如何存儲這些信息呢?它主要維護兩個文件:fsimage系統快照文件,和editlog操做日誌文件。
  2. fsimage:   在HDFS啓動的時候會加載fsimage上的信息,是整個集羣的快照,包括目錄的修改時間、權限信息,和文件的數據塊描述、修改時間、被訪問時間信息等。fsimage保存了最新的元數據檢查點,怎麼理解呢?fsimage就是整個集羣資源的縮略圖,集羣中全部的資源都經過它的描述展現出來,若是由於某些緣由,致使新增了一個block,可是沒有經過fsimage和下面要說的editlog同步,那麼這個block就永遠不會被使用了。
  3. editlog:   editlog在NameNode啓動的狀況下對HDFS上進行的各類元數據操做進行記錄。客戶端對HDFS全部的更新操做,好比移動數據、刪除操做等,都會記錄在editlog中。
SecondaryNameNode
  1. SecondaryNameNode存在的意義:   剛纔提到了editlog,若是持續的進行HDFS的更新操做,那麼時間久了若是不將editlog和fsiamge進行合併,editlog會變得很是大;若是常常合併,這會消耗大量的系統資源,會致使NameNode在合併期間幾乎不可用。可是無論怎麼樣,合併是必須的,不能在NameNode上進行合格合併操做,那麼能夠在別的機器上進行,只要將合併後的fsimage文件傳回來就好了,這就是SecondaryNameNode存在的意義。
  2. SecondaryNameNode合併過程:
    SecondaryNameNode合併過程
    第一步,將HDFS的更新操做記錄寫入到一個新的文件:edits.new; 第二步,將NameNode上的全部editlog和fsimage發送到SecondaryNameNode; 第三步,在SecondaryNameNode上將發過來的editlog文件和fsimage合併成一個新文件:fsimage.ckpt。 第四步,將fsimage.ckpt發送回NameNode; 第五步,重命名fsimage.ckpt爲fsimage、edits.new爲edits。   另外有一點要提到的是,一旦NameNode宕機,SecondaryNameNode中其實還保存有一部分數據,還可以回覆一些挽回一點損失。

(八)、使用zookeeper配置高可用(HA)Hadoop集羣:

# 資源說明:
# 1. 三臺服務器,分別爲master1,master2,slave1
# 2. 使用master1和master2做爲HA中的namenode,只有slave1一個datanode
複製代碼

hdfs-site.xml配置

<property>
		<name>dfs.nameservices</name>
		<value>haojiCluster</value>
		<description>集羣服務ID</description>
	</property>
	
	<property>
		<name>dfs.ha.namenodes.haojiCluster</name>
		<value>master1,master2</value>
		<description>集羣服務ID內含有的namenode</description>
	</property>
	
	<property>
	    <name>dfs.namenode.rpc-address.haojiCluster.master1</name>
	    <value>master1:8020</value>
		<description>datanode和namenode RPC通訊地址1</description>
	</property>
	<property>
	    <name>dfs.namenode.rpc-address.haojiCluster.master2</name>
	    <value>master2:8020</value>
		<description>datanode和namenode RPC通訊地址1</description>
	</property>
	
	<property>
	    <name>dfs.namenode.http-address.haojiCluster.master1</name>
	    <value>master1:50070</value>
		<description>訪問namenode的http地址(好比ui界面)</description>
	</property>
	<property>
	    <name>dfs.namenode.http-address.haojiCluster.master2</name>
	    <value>master2:50070</value>
		<description>訪問namenode的http地址(好比ui界面)</description>
	</property>
	
	<property>
		<name>dfs.namenode.shared.edits.dir</name>
	    <value>qjournal://master1:8485;master2:8485;slave1:8485/haojiCluster</value>
		<description>配置journalnode集羣的訪問地址</description>
	</property>
	
	<property>
		<name>dfs.client.failover.proxy.provider.haojiCluster</name>
		<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
		<description>配置dfs客戶端,用來判斷哪一個namenode處於活躍狀態</description>
	</property>
	
	<property>
        <name>dfs.ha.fencing.methods</name>
        <value>sshfence</value>
		<description>爲了防止腦裂現象,須要配置一個解決方案,讓備用的那個namenode可以經過這個方式去殺掉那個進程,好比ssh</description>
    </property>
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>~/.ssh/id_rsa</value>
		<description>既然須要ssh,那麼就須要ssh免密登陸</description>
    </property>
	
	<property>
	    <name>dfs.journalnode.edits.dir</name>
	    <value>/usr/local/hadoop-2.9.2/journal_data</value>
		<description>journal節點目錄</description>
	</property>
	
	<property>
        <name>dfs.replicatio</name>
        <value>3</value>     
		<description>集羣副本數</description>
    </property>
	
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:////usr/local/hadoop-2.9.2/hdfs/name</value>     
		<description>hadoop的name目錄路徑</description>
    </property>

    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:////usr/local/hadoop-2.9.2/hdfs/data</value> 
		<description>hadoop的data目錄路徑</description>
    </property>
	
    <property>
        <name>dfs.namenode.servicerpc-address</name>
        <value>master1:10000</value>     
	<description>hadoop的name目錄路徑</description>
    </property>
	
    <property>
        <name>dfs.webhdfs.enabled</name>
        <value>true</value>  	   
		<description>指定在namenode和DataNode之間是否開啓webHDFS功能</description>
    </property>
	
    <property>
		<name>dfs.permissions</name>
        <value>false</value>
		<description>經過UI操做hdfs時是否須要權限認證</description>
    </property>
	
	<property>
	    <name>dfs.ha.automatic-failover.enabled</name>
		<value>true</value>
		<description>自動failover啓動</description>
	</property>
複製代碼

core-site.xml

<property>
	   <name>ha.zookeeper.quorum</name>
	   <value>master1:2181,master2:2181,slave1:2181</value>
	   <description>zookeeper集羣訪問地址</description>
	</property>
	
	<property>
	   <name>fs.defaultFS</name>
	   <value>hdfs://haojiCluster</value>
	   <description>集羣對外訪問ID,客戶端拿着這個ID去訪問zookeeper集羣查出處於活躍狀態的namenode的ip和端口,再進行鏈接</description>
	</property>
	
	<property>
	   <name>ha.zookeeper.session-timeout.ms</name>
	   <value>30000</value>
	   <description>zkfc超過這個時間連不上zookeeper就會自動退出,默認5s</description>
	</property>
	
	<property>	
	    <name>ipc.client.connect.max.retries</name>
	    <value>20</value>
	    <description>Indicates the number of retries a clientwill make to establisha server connection.</description>
  	</property>

  	<property>
	   <name>ipc.client.connect.retry.interval</name>
	   <value>5000</value>
	   <description>Indicates the number of milliseconds aclient will wait for before retrying to establish a server connection.</description>
  	</property>

	<property>
	   <name>hadoop.tmp.dir</name>
	   <value>/usr/local/hadoop-2.9.2/tmp</value>
	   <description></description>
	</property>
複製代碼

將這個文件複製到其餘集羣 具體操做

cd /usr/local/hadoop-2.9.2
# 0. 全部節點上啓動journal node
sbin/hadoop-daemon.sh start journalnode
# 1. master1上初始化namenode
bin/hdfs namenode -format
# 2. 在master1上啓動namenode
sbin/hadoop-daemon.sh start namenode
# 3. master2上初始化另外一個namenode
bin/hdfs namenode -bootstrapStandby
# 4. master2上啓動namenode
sbin/hadoop-daemon.sh start namenode
# 5. 啓動zk集羣
$ZOOKEEPER_HOME/bin/zkServer.sh start
# 6. 在任何一個能鏈接上zk的機器上格式化zk(建立hadoop-ha znode節點)
bin/hdfs zkfc -formatZK
# 7. 在兩個namenode上啓動zkfc
sbin/hadoop-daemon.sh --script bin/hdfs start zkfc
複製代碼

  打開每個namenode的UI界面,能夠發現至少有一個namenode處於active狀態,另外的處於standby狀態。須要注意的是,處於standby的節點,沒有open權限,也就是說不能查看集羣上的具體文件結構。   這個時候已經啓動了HA集羣,能夠作個實驗:將master1直接kill -9,而後發現master1的namenode進程退出,而master2的狀態由standby變成active。

相關文章
相關標籤/搜索