這個問題的答案來自於計算機硬盤的發展趨勢:尋址時間遠遠比不上傳輸速率的提高。尋址是將磁頭移動到特定的硬盤位置進行讀或者寫操做,而傳輸速率取決於硬盤的帶寬。而尋址就是致使磁盤操做延遲的主要緣由。 當數據量很小時,傳統的數據庫依靠B樹(一種數據結構,主要用於關係型數據庫索引),能夠實現快速的讀取和更新;可是當數據量很大時,由於須要不少的「排序/合併」操做,傳統的數據庫系統就明顯落後於MapReduce了。 java
簡而言之,他們認爲:1. MapReduce放棄了 不少那些通過歷代數據庫專家優化提出的高性能數據庫技術,好比批量導入、索引、視圖、更新、事物等;2. MapReduce是一個粗糙的實現,它沒有索引,依靠蠻力做爲處理選項;3. MapReduce並不稀奇,以前就已經有人在使用類似的概念而且作出產品來了。 node
結構化數據是指具備既定格式的實體化數據,好比XML文檔或者知足特定預約義格式的數據表。這是RDBMS(關係型數據庫)包含的技術。 半結構化數據比較鬆散,它可能有格式,可是常常被忽略,因此通常只能做爲對數據結構的通常性指導。好比Excel電子表格,它在結構上是單元格組成的網格,可是你能夠在單元格中保存任何格式的數據。 非結構化數據沒有什麼特別的內部結構,例如純文本或者圖像數據。 Hadoop對半結構化和非結構化數據很是有效,由於它是在處理數據時纔對數據進行解釋(所謂的「讀時模式」)。這種模式在提供靈活的同時,避免了RDBMS在數據加載階段帶來的高開銷,由於這在Hadoop中僅僅是一個簡單的文件拷貝操做。(這也是爲何Hadoop能夠進行高速流式讀/寫的緣由) 關係型數據每每是規範的,,這主要是爲了數據的完整性且不含冗餘。可是規範性會給Hadoop處理帶來麻煩,由於它使記錄變成非本地操做,而Hadoop的核心假設之一恰恰就是能夠進行(高速的)流讀/寫操做。(暫時還沒理解這句話的意思)而WEB日誌就是典型的非規範化數據,例如每次都要記錄客戶端主機的名字和IP,這會致使同一個客戶端的全名可能出現屢次。 MapReduce以及Hadoop中其餘的處理模型都是能夠隨着數據規模現行伸縮的(可擴展性)。對數據分區後,函數原語(如map程序和reduce程序)可以在各個分區上面並行工做。這意味着,若是輸入的數據量是原來的兩倍,那麼做業的運行時間也須要兩倍。可是若是集羣的規模擴展爲原來的兩倍,那麼做業的速度依然能夠變得和原來同樣快。(分數不夠,拿錢來湊...)可是SQL查詢不具有這樣的特性。 2. 高性能計算和Hadoop計算web
高性能計算相關的組織多年以來一直研究大規模的數據處理,主要使用相似於消息傳遞接口MPI的相關API。 從廣義上講,高性能計算採用的方法是將做業分散到集羣的各臺機器上,這些機器訪問存儲區域網絡(SAN)組成的共享文件系統。這比較適合計算密集型的做業。可是若是節點須要訪問的數據量更龐大,不少計算節點就會由於網絡帶寬的瓶頸而不得不閒下來等數據。 而對於Hadoop而言,它儘可能在計算節點上存儲數據,以實現數據的本地快速訪問。這在Hadoop中稱爲數據本地化。意識到網絡帶寬是數據中心環境最珍貴的資源(處處複製數據很容易耗盡網絡帶寬)以後,Hadoop採用顯式網絡拓撲結構來保留網絡帶寬(將網絡當作一棵樹,兩個節點之間的距離使他們到共同的祖先的距離綜合)。 3. MapReduce數據流(P31 2.4 橫向擴展) 首先定義一些術語:MapReduce做業(Job)是客戶端須要執行的一個工做單元,它包括輸入數據、MapReduce程序和配置信息。而Hadoop將做業分紅若干任務(task)來執行,任務有兩類:map任務和reduce任務。這些任務運行在集羣的節點上,並經過Yarn調度。若是一個任務失敗,它Yarn將在另一個不一樣的節點上自動從新調度運行。Hadoop將MapReduce的輸入數據劃分紅等長的數據塊,稱爲「輸入分片」,簡稱「分片」。 算法
這個數據塊的大小應該不能超過HDFS的塊大小,緣由是:1. 數據本地化。Hadoop在存儲有HDFS數據的節點上運行Map任務能夠得到最佳性能,由於它無需使用寶貴的集羣帶寬資源;若是在本地找不到,Yarn就會調度同一個機架上的空閒slot來運行該任務,此時屬於同機架內不一樣節點間調度,有時候還會跨機架調度,可是這種狀況幾乎見不到。2. 若是分片後的數據跨過了兩個HDFS塊,因爲Hadoop的容災機制,任何一個節點都不可能同時存儲這兩個數據塊,所以不免會跨節點甚至是跨機架,效率會變得很低。 shell
由於map的輸出是中間結果,這個中間結果還要通過reduce處理後才能產生最終結果。並且一旦map做業成功完成,這個中間結果就會被刪除;一旦執行失敗,就會被從新調度運行。若是把這個中間結果存儲在HDFS上並實現備份,未免有點小題大作。 數據庫
集羣的帶寬資源很寶貴,所以儘可能避免map任務和reduce任務之間的數據傳輸是有必要的。Hadoop容許用戶針對map的輸出指定一個combiner(就像map同樣),combiner的輸入來自map,輸出做爲reduce的輸入。combiner屬於優化方案,因此有沒有combiner、或者調用多少次combiner,reduce輸出的結果都應該是同樣的。 簡單理解,combiner至關於對map的輸出結果進行一個相似於reduce的補充運算,以此來減小mapper和reducer之間的數據傳輸量。舉一個例子,咱們的目的是統計1950年全國最高氣溫,第一個map的輸出爲((1950,0),(1950,10),(1950,20)),第二個map的輸出爲((1950,25),(1950,20)),沒有combiner的時候,reduce的輸入爲(1950,[0,10,20,25,20]),最後的輸出爲(1950,25)。當咱們使用combiner以後,第一個map的輸出爲(1950,20),第二個map的輸出爲(1950,25),那麼reduce的輸入爲(1950,[20,25]),最後仍是輸出(1950,25)。更簡單的說,咱們能夠經過下面的表達式來講明這個計算過程: 沒有使用combiner:max(0,10,20,25,20)=25。 沒有使用combiner:max(max(0,10,20),max(25,20))=max(20,25)=25。 4. 使用本身喜歡的語言實現MapReduce程序(P37 2.5 HadoopStreaming)apache
Hadoop提供了MapReduce的API,容許編程人員使用其餘的非Java語言來寫本身的map函數和reduce函數。Hadoop Streaming 使用 Unix標準輸入輸出流做爲Hadoop和應用程序之間的接口,因此,任何能進行Unix輸入與輸出的編程語言都能實現MapReduce程序。 map函數的輸入來自標準輸入,輸出結果寫到標準輸出。map輸出的鍵-值對是以一個製表符(\t)分割的行,而且是通過對鍵排序的。 reduce的輸入來自於標準輸入,輸出結果寫到標準輸出(Hadoop能夠接受此標準輸出,並將其持久化)。reduce的輸入與map的輸出相同。 5. HDFS(P42 3.1 HDFS的設計)編程
剛開始我對這塊也很好奇:爲何該接口不在確認存入第一個節點以後就返回結果,以後節點之間採用異步的方式將副本同步?後面我看到了這句話:只要寫入了dfs.namenode.replication.min的副本數(默認爲1),寫操做就會成功,而且這個塊能夠在集羣中異步複製,直到達到其目標副本數(dfs.replication的默認值爲3)。 namenode選擇在哪一個datanode上存儲副本,要從可靠性、寫入帶寬、讀取帶寬之間進行權衡。通常的作法是:第一個副本放在運行客戶端的節點上(若是運行客戶端的節點不是datanode,那麼就隨機選擇一個不那麼忙也不那麼滿的節點);第二個節點存放在與第一個節點不一樣的機架上的任意一個節點;第三個節點存放在與第二個節點相同機架的另外一任意節點。 6. HDFS的I/O操做(P96 5.1)bootstrap
寫數據時:由DataNode負責對所要存儲的數據的驗證操做。寫數據的客戶端將數據以及校驗和發送給一系列DataNode組成的管線,管線中的最後一個DataNode負責校驗操做。若是最後一個DataNode檢測到錯誤,客戶端就會收到一個IOException。 讀數據時:由客戶端進行驗證操做。每個DataNode都會保存一個用於驗證的校驗和日誌,裏面記錄每個數據塊的最後一個驗證時間。客戶端收到數據以後進行驗證,若是成功,則修改該日誌;若是失敗,會執行下面的操做:1. 向NameDode彙報出錯的數據塊block以及這個數據塊所在的DataNode,同時拋出ChecksumException異常;2. NameNode將這個block標記爲已損壞,這樣它就不會再將一樣的客戶端請求發送到這個節點,同時嘗試將這個block的一個副本複製到另外的DataNode,使得數據塊的副本因子回到指望值;3. 將這個已經損壞的block刪除。 - 序列化操做?bash
序列化是指將結構化對象轉換爲字節流以便在網絡上傳輸或者寫入到磁盤中進行永久存儲的過程。反序列化是指將字節流轉回結構化對象的逆過程。 序列化在分佈式數據處理有兩大應用場景:進程間通訊和永久存儲。 在Hadoop生態中,系統中多個節點上的進程之間的通訊是經過**RPC(Remote Produce Call,遠程過程調用)**實現的。RPC協議將消息序列化爲二進制流以後發送到遠端節點,遠端節點接收到二進制流以後將其反序列化爲原始消息。一般狀況下,RPC序列化有如下特色:緊湊、快速、可擴展、支持互操做。Hadoop編程中使用Writable接口實現序列化。
具體由輸入文件數目、輸入文件大小、配置的參數決定的。 首先了解配置參數:
mapreduce.input.fileinputformat.split.minsize // 啓動map的最小split size ,默認爲0
mapreduce.input.fileinputformat.split.maxsize // 啓動map的最大split size ,默認爲256
dfs.block.size // Hadoop系統中的block塊大小,默認爲128M
splitsize = Math.max(minsize, Math.min(maxSize,blockSize))
複製代碼
例如:默認狀況下,一個輸入文件800M,那麼mapper的數量應該爲7個,其中6個大小爲128M,1個大小爲32M; 再例如:一個目錄下有三個文件,大小分別爲5M、10M和150M,那麼這個時候會產生四個mapper,它們所處理的數據大小分別爲5M、10M、128M和22M; 固然咱們也能自定義mapper的數量。好比使上面的mapper數量變成2,一個處理大小爲128M、另外一個處理大小爲37M(5 + 10 + 22),具體實現能夠經過設置具體的參數。 2. 環形緩衝區細節:
map task的任務輸出首先會進入到一個緩衝區內,這個緩衝區默認大小爲100M,當緩衝區達到容量的80%的時候,一個單獨的守護進程——spill(溢寫)進程會將緩衝區的內容溢寫(spill)到本地磁盤。溢寫是一個單獨的進程,不會影響map端的繼續輸出,可是當溢寫的過程當中寫入速度過快致使緩衝區滿,那麼map的寫操做就會被阻塞,直到溢寫完成。 3. partition細節:
首先要知道的是,默認狀況下,無論有多少個map任務,一個reduce任務只會產生一個輸出文件。可是有時候咱們須要將最終的輸出數據分散到不一樣的文件中去,好比按照省份劃分,將同一個省份的數據寫入同一個文件中,最終有多個文件。而最終的數據來源於reduce,也就是說,若是要獲得多個文件,意味着須要一樣數量的reducer運行。而reduce的數據輸入來自於map,也就是說,咱們要實現多個reduce,得根據map的不一樣輸出作手腳,將不一樣的map輸出按照自定義規則分配給不一樣的reduce任務。而map任務劃分數據的過程稱爲partition。 partition就是提早對輸入進行處理,根據自定義的reduce進行分區,到了reduce處理的時候,只須要處理對應的分區數據就好了。 默認的partition方法以下所示:
public int getPartition(k Key, v Value, int numReduceTasks){
return (key.hashCode() & Inter.MAX_VALUE) % numReduceTasks;
}
複製代碼
前面括號內的結果表示將key的hash值變成一個非負值。numReduceTasks指的是reducer的數量,默認值是1。由於任何一個非負整數除以1的結果是0,也就是說,在默認狀況下,getPartition方法的返回值老是1。也就是說,mapper的任務輸出老是送給一個reducer,最終只能輸出到一個文件中去。這裏須要注意的是,若是numReduceTasks的數量爲0,那麼map會將結果直接刷寫到HDFS上去做進一步處理。 固然咱們能夠重載此方法,進而實現自定義的partition。 對map輸出的每個鍵值對,系統都會經過這個方法給定一個partition。若是一個鍵值對的partition值爲0,那麼它將會交給第1個reducer去處理,partition值爲1,會交給第二個reducer去處理。 4. sort和spill細節:
當溢寫線程啓動的時候,須要對緩衝區中80%的數據作排序。這裏的排序是針對序列化的字節根據鍵值作排序,使用Hadoop本身定義的排序算法,具體實現細節爲快速排序法(先按照分區編號partition進行排序,再按照key進行排序),這樣的結果是,數據以分區彙集在一塊兒,而且同一個分區中的數據按照key有序。 溢寫的時候,按照分區編號由小到大依次將每一個分區中的數據寫入到任務工做目錄的臨時文件output/spillN.out(N表示當前溢寫的次數)中。 若是設置了combiner,那麼這個時候也會進行combiner操做。combiner會將形同key的key/value對的value按照設定的操做,好比加起來,減小溢寫到磁盤的數據量。combiner會優化map的輸出結果,可是不能對reduce的輸出結果產生影響,在這個前提下,combiner是個好東西,會在模型中屢次使用。 在優化的時候,爲了儘可能減小對磁盤文件的I/O操做,能夠在這一步對spill的文件進行壓縮,使其編程IFile格式,這也將有利於後面的merge操做,由於merge的時候須要根據spill後的文件大小進行排序操做。 5. merge細節:
每一次的spill都會產生一個spill文件,因此map task計算的時候會不斷產生不少的spill文件,在map task結束以前會對這些spit文件進行合併造成一個已分區而且排序的輸出文件(能夠控制一次可以合併多少流,默認是10)0,最終的文件也只有一個,這個過程就是merge。 讓每個map task最終只生成一個數據文件,能夠避免同時打開大量文件金額同時讀取大量小文件產生的隨機讀取帶來的開銷。 每個map task都有一個緩衝區,存儲着map的輸出結果,當緩衝區快滿的時候須要將緩衝區的數據以臨時文件的形式存放到磁盤,當整個map task結束後再對磁盤中這個map task產生的全部臨時文件作合併,生成最終的正式輸出文件,而後等待reduce task來拉取數據。 merge採用多輪遞歸合併的方式,首先根據全部文件的大小創建小根堆,而後選取前十個元素,依次迭代讀取spill下來的文件中的key-value,並將生成的文件又放回到原來的文件中,以後再次創建小根堆,再執行上面的操做。因此merge的過程能夠當作建堆=>選取前十個元素迭代合併=>再建堆=>再合併……
shuffle要作的事是怎麼把map task的輸出結果有效地傳送到reduce端,或者能夠這樣理解,shuffle描述着數據從map task到reduce task輸入的這段過程。 在map處理階段結束後,會將多個文件合併成一個文件,這個文件中相同分區的數據放在一塊兒,同一個分區中的數據按照key有序。在reduce階段須要從多個map task中獲取屬於該reduce的分區的結果值,而後根據獲取到的文件大小決定放在內存中仍是刷寫到磁盤中,也就是說,從每個map中獲取的數據是一段內存或者一個文件,而後對內存和文件進行和map階段類似的merge操做,將結果中的每一行key/value執行reduce函數。
如上圖所示Map階段須要對多個文件的數據合併,在Reduce階段Reduce Task 1向各個Map獲取分區1的數據,Reduce Task 2向各個Map獲取分區2的數據,Reduce Task 3向各個Map獲取分區3的數據,圖中爲了簡便,Reduce Task3向map獲取數據的指向沒有標識。 注意:reduce從map獲取的是一個out文件中對應的partition數據,而不是整個out文件!!!咱們的目的是統計兩個文件中每一個單詞出現的總次數。 首先建立兩個文件,做爲咱們的輸入:
file 1:
His name is Tom
Tom comes from Yunge
file 2:
His name is Jerry
Jerry comes from Lingge
複製代碼
map,映射,也就是拆解的意思。 咱們的輸入是兩個文件,在默認狀況下,會產生兩個split,也就是兩個mapper:mapper1和mapper2。 接下來,這兩個mapper會分別將文件內容分解爲單詞和1(注意,這裏的1不是具體數量,只是數字1),其中單詞是咱們的主鍵也就是key,後面的數字就是對應的value。 那麼每個mapper對應的輸出爲: mapper1:
His 1
name 1
is 1
Tom 1
Tom 1
comes 1
from 1
Yunge 1
複製代碼
mapper2:
His 1
name 1
is 1
Jerry 1
Jerry 1
comes 1
from 1
Lingge 1
複製代碼
partition,分區。爲何要分區?由於後面會有多個reducer,每個reducer只幹本身的事,這回=會讓效率提高很多。partition就是提早對輸入進行處理,根據自定義的reduce進行分區,到了reduce處理的時候,只須要處理對應的分區數據就好了。 那麼如何分區呢?主要依據就是按照key將數據按照reduce分紅對應數量的組,就像彙總硬幣同樣,一元的進入1號桶,5角的進入2號桶,一角的進入三號桶。這個很重要的一點是須要保證key的惟一性,所以最多見的方法就是使用hash函數。這裏咱們假設有兩個reducer,咱們將首字母對應的字母順序進行除2取模,所以每個mapper進行partition以後的結果以下:
mapper1:
partition 1:
His 1
is 1
comes 1
from 1
partition 2:
name 1
Tom 1
Tom 1
Yunge 1
複製代碼
mapper2:
partition 1:
His 1
is 1
Jerry 1
Jerry 1
comes 1
from 1
Lingge 1
partition 2:
name 1
複製代碼
其中partition 1是給reducer 1處理的,partition 2是給reducer 2處理的。 能夠看到,partition只是按照key進行了簡單的分區,並無任何別的處理,而且每個分區中的key不會出如今另外一個分區裏面。
sort,排序。由於後面的reducer也會作排序,可是它只是作一個歸併排序,要求每個mapper的輸出結果也是基於key有序的。這裏咱們根據首字母進行字典排序:
mapper1:
partition 1:
comes 1
from 1
His 1
is 1
partition 2:
name 1
Tom 1
Tom 1
Yunge 1
複製代碼
mapper2:
partition 1:
comes 1
from 1
His 1
is 1
Jerry 1
Jerry 1
Lingge 1
partition 2:
name 1
複製代碼
能夠看到,每個partition中的數據都按照key作了排序。
combine能夠理解成一個mini reducer,它發生在spill到本次磁盤過程以前,目的就是把送到reducer的數據實現進行一次計算,以減小文件大小、減小對網絡帶寬的消耗。可是要注意的是,combine操做是可選的,若是要加上,請務必保證通過combine以後的數據不會對最終的reduce結果產生影響。下面咱們執行combine:
mapper1:
partition 1:
comes 1
from 1
His 1
is 1
partition 2:
name 1
Tom 2
Yunge 1
複製代碼
mapper2:
partition 1:
comes 1
from 1
His 1
is 1
Jerry 2
Lingge 1
partition 2:
name 1
複製代碼
由於最後reducer執行的操做是add,那麼提早add和後面add的效果是同樣的,所以這個combiner是有效的。能夠看到,結果中,對重複的單詞進行了簡單的彙總。
copy,也叫shuffle,就是reducer本身從mapper拉去數據。每個reducer只拉取屬於本身partition的數據,結果以下:
reducer 1:
partition 1:(來自mapper1)
comes 1
from 1
His 1
is 1
partition 1:(來自mapper2)
comes 1
from 1
His 1
is 1
Jerry 2
Lingge 1
複製代碼
reducer 2:
partition 2:(來自mapper1)
name 1
Tom 2
Yunge 1
partition 2:(來自mapper2)
name 1
複製代碼
能夠看到,經過shuffle操做,相同partition的數據落到了同一個節點(reducer)上。
merge,合併。將reducer獲得的文件合併成同一個文件,須要注意的是,這個過程也包含了排序。結果以下:
reducer 1:
comes 1
comes 1
from 1
from 1
His 1
His 1
is 1
is 1
Jerry 2
Lingge 1
複製代碼
reducer2:
partition 2:
name 1
name 1
Tom 2
Yunge 1
複製代碼
reduce,歸併。最終的一步,將相同的key的value加1。結果以下圖:
reducer 1:
comes 2
from 2
His 2
is 2
Jerry 2
Lingge 1
複製代碼
reducer2:
partition 2:
name 1
Tom 2
Yunge 1
複製代碼
大功告成!咱們統計了兩個文件中每個單詞的數目,由於有兩個reducer,所以輸出結果會有兩個文件,即part-000000和part-00001。
0. 預覽被統計的文件格式
1. 新建maven項目,在pom.xml文件中添加下面的依賴和插件<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<excludeTransitive>false</excludeTransitive>
<stripVersion>true</stripVersion>
<outputDirectory>./lib</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
複製代碼
2. 新建mapper內部類
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@java.lang.Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String words = value.toString(); // 傳進來一行數據,先將其轉換成string
String[] wordsArr = words.split(","); // 由於是csv文件,因此使用","做爲列分隔符
context.write(new Text(wordsArr[0]), new LongWritable(1)); // 以省份做爲鍵值,value固定爲1
context.write(new Text(wordsArr[0] + "," + wordsArr[1]), new LongWritable(1)); // 以(省份,市)做爲鍵值,value固定爲1
}
}
複製代碼
2. 新建reducer內部類
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
/** * 輸入格式 : <word,[1,1,1,1]> */
Long sum = 0L;
for (LongWritable value : values
) {
sum += value.get(); // 實現value的累加
}
context.write(key, new LongWritable(sum)); // 將結果輸出
}
}
複製代碼
3. 實現主方法
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 建立一個job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word-count");
// 2. 將類名打包成Jar包
job.setJarByClass(WordCount.class);
// 3. 輸入文件地址
FileInputFormat.addInputPath(job, new Path(args[0]));
// 4. mapper處理邏輯
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 5. reducer處理邏輯
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 6. shuffle過程
// 暫不處理
// 7. 定義輸出地址
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8. 運行結果顯示,若是成功就輸出"成功",不然輸出"失敗"
boolean result = job.waitForCompletion(true);
System.out.println(result ? "成功": "失敗");
}
複製代碼
4. 整個WordCount.java完成代碼:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String words = value.toString();
String[] wordArr = words.split(","); // 將每一行數據拆分紅每一個單詞
// context.write(new Text(wordArr[0]), one); // 統計某個省的學校數量
context.write(new Text(wordArr[0] + "," + wordArr[1]), one); // 統計某個市的學校數量
}
}
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
/** * 輸入格式 : <word,[1,1,1,1]> */
Long sum = 0L;
for (LongWritable value : values
) {
sum += value.get(); // 實現value的累加
}
context.write(key, new LongWritable(sum)); // 將結果輸出
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 建立一個job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word-count");
// 2. 將類名打包成Jar包
job.setJarByClass(WordCount.class);
// 3. 輸入文件地址
FileInputFormat.addInputPath(job, new Path(args[0]));
// 4. mapper處理邏輯
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 5. reducer處理邏輯
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 6. shuffle過程
// 暫不處理
// 7. 定義輸出地址
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8. 運行結果顯示,若是成功就輸出"成功",不然輸出"失敗"
boolean result = job.waitForCompletion(true);
System.out.println(result ? 1 : 0);
}
}
複製代碼
5. 將代碼打包成jar包
yarn jar ProvinceCount.jar WordCount hdfs://master:8020/test_data/senior_school_name.csv hdfs://master:8020/test_data/province_out/
# ProvinceCount.jar爲jar包名稱,WordCount爲主類,hdfs://master:8020/test_data/senior_school_name.csv爲輸入文件(這裏是HDFS上的文件),hdfs://master:8020/test_data/province_out/是結果輸出文件夾
複製代碼
6. 執行結果: 你能夠經過UI界面查看執行狀況:
job執行過程的日誌輸出以下圖所示:[root@master hadoop-2.9.2]# yarn jar ProvinceCount.jar WordCount hdfs://master:8020/test_data/senior_school_name.csv hdfs://master:8020/test_data/province_out/
18/12/25 04:33:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/25 04:33:37 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.111.132:8032
18/12/25 04:33:37 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/12/25 04:33:43 INFO input.FileInputFormat: Total input files to process : 1
18/12/25 04:33:45 INFO mapreduce.JobSubmitter: number of splits:1
18/12/25 04:33:45 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/12/25 04:33:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1545718642445_0006
18/12/25 04:33:47 INFO impl.YarnClientImpl: Submitted application application_1545718642445_0006
18/12/25 04:33:47 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1545718642445_0006/
18/12/25 04:33:47 INFO mapreduce.Job: Running job: job_1545718642445_0006
18/12/25 04:34:42 INFO mapreduce.Job: Job job_1545718642445_0006 running in uber mode : false
18/12/25 04:34:42 INFO mapreduce.Job: map 0% reduce 0%
18/12/25 04:36:12 INFO mapreduce.Job: map 100% reduce 0%
18/12/25 04:36:21 INFO mapreduce.Job: map 100% reduce 100%
18/12/25 04:36:22 INFO mapreduce.Job: Job job_1545718642445_0006 completed successfully
18/12/25 04:36:22 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=660893
FILE: Number of bytes written=1719115
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1536880
HDFS: Number of bytes written=362
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=84094
Total time spent by all reduces in occupied slots (ms)=6551
Total time spent by all map tasks (ms)=84094
Total time spent by all reduce tasks (ms)=6551
Total vcore-milliseconds taken by all map tasks=84094
Total vcore-milliseconds taken by all reduce tasks=6551
Total megabyte-milliseconds taken by all map tasks=86112256
Total megabyte-milliseconds taken by all reduce tasks=6708224
Map-Reduce Framework
Map input records=38356
Map output records=38356
Map output bytes=584175
Map output materialized bytes=660893
Input split bytes=116
Combine input records=0
Combine output records=0
Reduce input groups=31
Reduce shuffle bytes=660893
Reduce input records=38356
Reduce output records=31
Spilled Records=76712
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=1346
CPU time spent (ms)=12980
Physical memory (bytes) snapshot=332660736
Virtual memory (bytes) snapshot=4213764096
Total committed heap usage (bytes)=137498624
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1536764
File Output Format Counters
Bytes Written=362
成功
[root@master hadoop-2.9.2]#
複製代碼
查看HDFS文件系統中是否有文件輸出:
查看文件內容:# 在集羣shell中輸入:
hdfs dfs -text /test_data/province_out/part-r-00000
# 結果以下:
複製代碼
# 資源說明:
# 1. 三臺服務器,分別爲master1,master2,slave1
# 2. 使用master1和master2做爲HA中的namenode,只有slave1一個datanode
複製代碼
hdfs-site.xml配置
<property>
<name>dfs.nameservices</name>
<value>haojiCluster</value>
<description>集羣服務ID</description>
</property>
<property>
<name>dfs.ha.namenodes.haojiCluster</name>
<value>master1,master2</value>
<description>集羣服務ID內含有的namenode</description>
</property>
<property>
<name>dfs.namenode.rpc-address.haojiCluster.master1</name>
<value>master1:8020</value>
<description>datanode和namenode RPC通訊地址1</description>
</property>
<property>
<name>dfs.namenode.rpc-address.haojiCluster.master2</name>
<value>master2:8020</value>
<description>datanode和namenode RPC通訊地址1</description>
</property>
<property>
<name>dfs.namenode.http-address.haojiCluster.master1</name>
<value>master1:50070</value>
<description>訪問namenode的http地址(好比ui界面)</description>
</property>
<property>
<name>dfs.namenode.http-address.haojiCluster.master2</name>
<value>master2:50070</value>
<description>訪問namenode的http地址(好比ui界面)</description>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://master1:8485;master2:8485;slave1:8485/haojiCluster</value>
<description>配置journalnode集羣的訪問地址</description>
</property>
<property>
<name>dfs.client.failover.proxy.provider.haojiCluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
<description>配置dfs客戶端,用來判斷哪一個namenode處於活躍狀態</description>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
<description>爲了防止腦裂現象,須要配置一個解決方案,讓備用的那個namenode可以經過這個方式去殺掉那個進程,好比ssh</description>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>~/.ssh/id_rsa</value>
<description>既然須要ssh,那麼就須要ssh免密登陸</description>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/usr/local/hadoop-2.9.2/journal_data</value>
<description>journal節點目錄</description>
</property>
<property>
<name>dfs.replicatio</name>
<value>3</value>
<description>集羣副本數</description>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:////usr/local/hadoop-2.9.2/hdfs/name</value>
<description>hadoop的name目錄路徑</description>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:////usr/local/hadoop-2.9.2/hdfs/data</value>
<description>hadoop的data目錄路徑</description>
</property>
<property>
<name>dfs.namenode.servicerpc-address</name>
<value>master1:10000</value>
<description>hadoop的name目錄路徑</description>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
<description>指定在namenode和DataNode之間是否開啓webHDFS功能</description>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
<description>經過UI操做hdfs時是否須要權限認證</description>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
<description>自動failover啓動</description>
</property>
複製代碼
core-site.xml
<property>
<name>ha.zookeeper.quorum</name>
<value>master1:2181,master2:2181,slave1:2181</value>
<description>zookeeper集羣訪問地址</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://haojiCluster</value>
<description>集羣對外訪問ID,客戶端拿着這個ID去訪問zookeeper集羣查出處於活躍狀態的namenode的ip和端口,再進行鏈接</description>
</property>
<property>
<name>ha.zookeeper.session-timeout.ms</name>
<value>30000</value>
<description>zkfc超過這個時間連不上zookeeper就會自動退出,默認5s</description>
</property>
<property>
<name>ipc.client.connect.max.retries</name>
<value>20</value>
<description>Indicates the number of retries a clientwill make to establisha server connection.</description>
</property>
<property>
<name>ipc.client.connect.retry.interval</name>
<value>5000</value>
<description>Indicates the number of milliseconds aclient will wait for before retrying to establish a server connection.</description>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop-2.9.2/tmp</value>
<description></description>
</property>
複製代碼
將這個文件複製到其餘集羣 具體操做
cd /usr/local/hadoop-2.9.2
# 0. 全部節點上啓動journal node
sbin/hadoop-daemon.sh start journalnode
# 1. master1上初始化namenode
bin/hdfs namenode -format
# 2. 在master1上啓動namenode
sbin/hadoop-daemon.sh start namenode
# 3. master2上初始化另外一個namenode
bin/hdfs namenode -bootstrapStandby
# 4. master2上啓動namenode
sbin/hadoop-daemon.sh start namenode
# 5. 啓動zk集羣
$ZOOKEEPER_HOME/bin/zkServer.sh start
# 6. 在任何一個能鏈接上zk的機器上格式化zk(建立hadoop-ha znode節點)
bin/hdfs zkfc -formatZK
# 7. 在兩個namenode上啓動zkfc
sbin/hadoop-daemon.sh --script bin/hdfs start zkfc
複製代碼
打開每個namenode的UI界面,能夠發現至少有一個namenode處於active狀態,另外的處於standby狀態。須要注意的是,處於standby的節點,沒有open權限,也就是說不能查看集羣上的具體文件結構。 這個時候已經啓動了HA集羣,能夠作個實驗:將master1直接kill -9,而後發現master1的namenode進程退出,而master2的狀態由standby變成active。