分佈式並行計算MapReduce

一、用本身的話闡明Hadoop平臺上HDFS和MapReduce的功能、工做原理和工做過程。

MapReduce是咱們再進行離線大數據處理的時候常常要使用的計算模型,MapReduce的計算過程被封裝的很好,咱們只用使用Map和Reduce函數,因此對其總體的計算過程不是太清楚,同時MapReduce1.0和MapReduce2.0在網上有不少人混淆。java

MapReduce1.0運行模型

 
20170730014216035.png

Input

Input可是輸入文件的存儲位置,python

可是注意這裏並必定是一些博客說的固然是HDFS似的分佈式文件系統位置,默認是HDFS文件系統,固然也能夠修改。

,它也能夠是本機上的文件位置。
咱們來仔細分析下input算法

 
8aab5880-d171-30f7-91d6-aaacba2d03ce.jpg

首先咱們知道要和JobTracker打交道是離不開JobClient這個接口的,就如上圖所示,bash

而後JobClient中的Run方法 會讓 JobClient 把全部 Hadoop Job 的信息,好比 mapper reducer jar path, mapper / reducer class name, 輸入文件的路徑等等,告訴給 JobTracker,以下面的代碼所示:網絡

public int run(String[] args) throws Exception { //create job Job job = Job.getInstance(getConf(), this.getClass().getSimpleName()); // set run jar class job.setJarByClass(this.getClass()); // set input . output FileInputFormat.addInputPath(job, new Path(PropReader.Reader("arg1"))); FileOutputFormat.setOutputPath(job, new Path(PropReader.Reader("arg2"))); // set map job.setMapperClass(HFile2TabMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // set reduce job.setReducerClass(PutSortReducer.class); return 0; } 

除此之外,JobClient.runJob() 還會作一件事,使用 InputFormat類去計算如何把 input 文件 分割成一份一份,而後交給 mapper 處理。inputformat.getSplit() 函數返回一個 InputSplit 的 List, 每個 InputSplit 就是一個 mapper 須要處理的數據。app

一個 Hadoop Job的 input 既能夠是一個很大的 file, 也能夠是多個 file; 不管怎樣,getSplit() 都會計算如何分割 input.分佈式

若是是HDFS文件系統,咱們都知道其能夠經過將文件分割爲block的形式存放在不少臺電腦上,使其能夠存放很大的文件。那麼Mapper是如何肯定一個HDFS文件中的block存放哪幾臺電腦,有什麼數據?

inputFormat它其實是個 interface, 須要 類 來繼承,提供分割 input 的邏輯。函數

Jobclient 有一個方法叫 setInputFormat(), 經過它,咱們能夠告訴 JobTracker 想要使用的 InputFormat 類 是什麼。若是咱們不設置,Hadoop默認的是 TextInputFormat, 它默認爲文件在 HDFS上的每個 Block 生成一個對應的 InputSplit. 因此你們使用 Hadoop 時,也能夠編寫本身的 input format, 這樣能夠自由的選擇分割 input 的算法,甚至處理存儲在 HDFS 以外的數據。oop

JobTracker 儘可能把 mapper 安排在離它要處理的數據比較近的機器上,以便 mapper 從本機讀取數據,節省網絡傳輸時間。具體實現是如何實現?

對於每一個 map任務, 咱們知道它的 split 包含的數據所在的主機位置,咱們就把 mapper 安排在那個相應的主機上好了,至少是比較近的host. 你可能會問:split 裏存儲的 主機位置是 HDFS 存數據的主機,和 MapReduce 的主機 有什麼相關呢?爲了達到數據本地性,其實一般把MapReduce 和 HDFS 部署在同一組主機上。測試

既然一個 InputSplit 對應一個 map任務, 那麼當 map 任務收到它所處理數據的位置信息,它就能夠從 HDFS 讀取這些數據了。

接下來咱們再從map函數看Input

map函數接受的是一個 key value 對。

實際上,Hadoop 會把每一個 mapper 的輸入數據再次分割,分割成一個個 key-value對, 而後爲每個 key-value對,調用Map函數一次. 爲了這一步分割,Hadoop 使用到另外一個類: RecordReader. 它主要的方法是 next(), 做用就是從 InputSplit 讀出一條 key-value對.

RecordReader 能夠被定義在每一個 InputFormat 類中。當咱們經過 JobClient.setInputFormat() 告訴 Hadoop inputFormat 類名稱的時候, RecordReader 的定義也一併被傳遞過來。

因此整個Input,

1.JobClient輸入輸入文件的存儲位置

2.JobClient經過InputFormat接口能夠設置分割的邏輯,默認是按HDFS文件分割。

3.Hadoop把文件再次分割爲key-value對。

4.JobTracker負責分配對應的分割塊由對應的maper處理,同時 RecordReader負責讀取key-value對值。

Mapper

JobClient運行後得到所需的配置文件和客戶端計算所得的輸入劃分信息。並將這些信息都存放在JobTracker專門爲該做業建立的文件夾中。文件夾名爲該做業的Job ID。JAR文件默認會有10個副本(mapred.submit.replication屬性控制);

而後輸入劃分信息告訴了JobTracker應該爲這個做業啓動多少個map任務等信息。

JobTracker經過TaskTracker 向其彙報的心跳狀況和slot(狀況),每個slot能夠接受一個map任務,這樣爲了每一臺機器map任務的平均分配,JobTracker會接受每個TaskTracker所監控的slot狀況。

JobTracker接收到做業後,將其放在一個做業隊列裏,等待做業調度器對其進行調度,看成業調度器根據本身的調度算法調度到該做業時,會根據輸入劃分信息爲每一個劃分建立一個map任務,並將map任務分配給TaskTracker執行,分配時根據slot的狀況做爲標準。

TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶着不少的信息,好比當前map任務完成的進度等信息。當JobTracker收到做業的最後一個任務完成信息時,便把該做業設置成「成功」。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。

Map經過 RecordReader 讀取Input的key/value對,map根據用戶自定義的任務,運行完畢後,產生另一系列 key/value,並將其寫入到Hadoop的內存緩衝取中,在內存緩衝區中的key/value對按key排序,此時會按照reduce partition進行,分到不一樣partition中,一旦內存滿就會被寫入到本地磁盤的文件裏,這個文件叫spill file。

shuffle

Shuffle是咱們不須要編寫的模塊,但倒是十分關鍵的模塊。

 
4df193f5-e56e-308f-9689-eac035dd8a2b.png

在map中,每一個 map 函數會輸出一組 key/value對, Shuffle 階段須要從全部 map主機上把相同的 key 的 key value對組合在一塊兒,(也就是這裏省去的Combiner階段)組合後傳給 reduce主機, 做爲輸入進入 reduce函數裏。

Partitioner組件 負責計算哪些 key 應當被放到同一個 reduce 裏

HashPartitioner類,它會把 key 放進一個 hash函數裏,而後獲得結果。若是兩個 key 的哈希值 同樣,他們的 key/value對 就被放到同一個 reduce 函數裏。咱們也把分配到同一個 reduce函數裏的 key /value對 叫作一個reduce partition.

咱們看到 hash 函數最終產生多少不一樣的結果, 這個 Hadoop job 就會有多少個 reduce partition/reduce 函數,這些 reduce函數最終被JobTracker 分配到負責 reduce 的主機上,進行處理。

咱們知道map階段可能會產生多個spill file 當 Map 結束時,這些 spill file 會被 merge 起來,不是 merge 成一個 file,而是也會按 reduce partition 分紅多個。

當 Map tasks 成功結束時,他們會通知負責的 tasktracker, 而後消息經過 jobtracker 的 heartbeat 傳給 jobtracker. 這樣,對於每個 job, jobtracker 知道 map output 和 map tasks 的關聯。Reducer 內部有一個 thread 負責按期向 jobtracker 詢問 map output 的位置,直到 reducer 獲得全部它須要處理的 map output 的位置。

Reducer 的另外一個 thread 會把拷貝過來的 map output file merge 成更大的 file. 若是 map task 被 configure 成須要對 map output 進行壓縮,那 reduce 還要對 map 結果進行解壓縮。當一個 reduce task 全部的 map output 都被拷貝到一個它的 host上時,reduce 就要開始對他們排序了。

排序並非一次把全部 file 都排序,而是分幾輪。每輪事後產生一個結果,而後再對結果排序。最後一輪就不用產生排序結果了,而是直接向 reduce 提供輸入。這時,用戶提供的 reduce函數 就能夠被調用了。輸入就是 map 任務 產生的 key value對.

同時reduce任務並非在map任務徹底結束後纔開始的,Map 任務有可能在不一樣時間結束,因此 reduce 任務不必等全部 map任務 都結束纔開始。事實上,每一個 reduce任務有一些 threads 專門負責從 map主機複製 map 輸出(默認是5個)。

Reduce

reduce() 函數以 key 及對應的 value 列表做爲輸入,按照用戶本身的程序邏輯,經合併 key 相同的 value 值後,產 生另一系列 key/value 對做爲最終輸出寫入 HDFS

2.HDFS上運行MapReduce

1)查看是否已經安裝python:

2)在/home/hadoop/路徑下創建wc文件夾,在文件夾內新建mapper.py、reducer.py、run.sh和文本文件HarryPotter.txt:

3)查看mapper.py reducer.py run.sh的內容:

 

 

 

4)修改mapper.py和reducer.py文件的權限:

 

5)測試mapper.py和reducer.py:

6)啓動hadoop:

 

7)把文本文件上傳到hdfs:

 

 

8)將hadoop-streaming-2.7.1.jar的路徑添加到bashrc文件而且讓環境變量生效:

 

9)運行run.sh文件統計文本:

相關文章
相關標籤/搜索