簡單解析mapreduce切片

  在mapreduce中的切片是什麼意思?顧名思義就是將數據進行切分,切分爲數據片,其實這個切片關乎於map階段的map個數,以及每一個map處理的數據量的大小。node

  mapreduce中,一個job的map個數, 每一個map處理的數據量是如何決定的呢? 另外每一個map又是如何讀取輸入文件的內容呢? 用戶是否能夠本身決定輸入方式, 決定map個數呢? 編程

  mapreduce做業會根據輸入目錄產生多個map任務, 經過多個map任務並行執行來提升做業運行速度, 但若是map數量過少, 並行量低, 做業執行慢, 若是map數過多, 資源有限,也會增長調度開銷. 所以, 根據輸入產生合理的map數, 爲每一個map分配合適的數據量, 能有效的提高資源利用率, 並使做業運行速度加快。數組

  在mapreduce中, 每一個做業都會經過 InputFormat來決定map數量,oop

InputFormat是一個接口, 提供兩個方法:code

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;

  其中getSplits方法會根據輸入目錄產生InputSplit數組, 每一個InputSplit會相應產生一個map任務, map的輸入定義在InputSplit中. getRecordReader方法返回一個RecordReader對象, RecordReader決定了map任務如何讀取輸入數據, 例如一行一行的讀取仍是一個字節一個字節的讀取, 等等。而咱們大多數其實在使用TextInputFormat,但TextInputFormat沒有實現本身的getSplits方法,它繼承於FileInputFormat, 所以使用了FileInputFormat的getSplits方法.orm

  舊接口FileInputFormat的getSplits流程包含兩個參數和一個公式:對象

mapred.min.split.size        (一個map最小輸入長度)
mapred.map.tasks                (推薦map數量)
Math.max(minSize, Math.min(goalSize, blockSize))

  如何決定每一個map輸入長度呢? 首先獲取輸入目錄下全部文件的長度和, 除以mapred.map.tasks(這個屬性是咱們能夠再程序中設置的)獲得一個推薦長度goalSize, 而後經過公式: Math.max(minSize, Math.min(goalSize, blockSize))決定map輸入長度。 這裏的minSize爲mapred.min.split.size, 這個屬性是咱們在配置文件中配置的,blockSize爲相應文件的block長度。這公式能保證一個map的輸入至少大於mapred.min.split.size, 對於推薦的map長度,只有它的長度小於blockSize且大於mapred.min.split.size纔會有效果. 因爲mapred.min.split.size默認長度爲1, 所以一般狀況下只要小於blockSize就有效果,不然使用blockSize作爲map輸入長度.繼承

  所以, 若是想增長map數, 能夠把mapred.min.split.size調小(其實默認值便可), 另外還須要把mapred.map.tasks設置大.若是須要減小map數,能夠把mapred.min.split.size調大, 另外把mapred.map.tasks調小.接口

  這裏要特別指出的是FileInputFormat會讓每一個輸入文件至少產生一個map任務, 所以若是你的輸入目錄下有許多文件, 而每一個文件都很小, 例如幾十kb, 那麼每一個文件都產生一個map會增長調度開銷. 做業變慢.資源

  那麼如何防止這種問題呢? CombineFileInputFormat能有效的減小map數量。

  Hadoop 0.20開始定義了一套新的mapreduce編程接口, 使用新的FileInputFormat, 它與舊接口下的FileInputFormat主要區別在於, 它再也不使用mapred.map.tasks, 而使用mapred.max.split.size參數代替goalSize, 經過Math.max(minSize, Math.min(maxSize, blockSize))決定map輸入長度, 一個map的輸入要大於minSize,小於Math.min(maxSize, blockSize)。

  若需增長map數,能夠把mapred.min.split.size調小,把mapred.max.split.size調大。 若需減小map數, 能夠把mapred.min.split.size調大, 並把mapred.max.split.size調小。

CombineFileInputFormat

  顧名思義, CombineFileInputFormat的做用是把許多文件合併做爲一個map的輸入.

  在它以前,可使用MultiFileInputFormat,不過其功能太簡單, 它以文件爲單位,一個文件至多分給一個map處理, 若是某個目錄下有許多小文件, 另外還有一個超大文件, 處理大文件的map會嚴重偏慢.

  CombineFileInputFormat是一個被推薦使用的InputFormat. 它有三個配置:

mapred.min.split.size.per.node, 一個節點上split的至少的大小
mapred.min.split.size.per.rack   一個交換機下split至少的大小
mapred.max.split.size             一個split最大的大小

  它的主要思路是把輸入目錄下的大文件分紅多個map的輸入, 併合並小文件, 作爲一個map的輸入. 具體的原理是下述三步:

  1. 根據輸入目錄下的每一個文件,若是其長度超過mapred.max.split.size,以block爲單位分紅多個split(一個split是一個map的輸入),每一個split的長度都大於mapred.max.split.size,
    由於以block爲單位, 所以也會大於blockSize,
    此文件剩下的長度若是大於mapred.min.split.size.per.node, 則生成一個split, 不然先暫時保留.
  2. 如今剩下的都是一些長度效短的碎片,把每一個rack下碎片合併, 只要長度超過mapred.max.split.size就合併成一個split, 最後若是剩下的碎片比mapred.min.split.size.per.rack大, 就合併成一個split, 不然暫時保留.
  3. 把不一樣rack下的碎片合併, 只要長度超過mapred.max.split.size就合併成一個split, 剩下的碎片不管長度, 合併成一個split.

舉例:

mapred.max.split.size=1000
mapred.min.split.size.per.node=300
 mapred.min.split.size.per.rack=100

  輸入目錄下五個文件,rack1下三個文件,長度爲2050,1499,10, rack2下兩個文件,長度爲1010,80. 另外blockSize爲500.

  通過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片爲rack1下:50,10; rack2下10:80

  因爲兩個rack下的碎片和都不超過100, 因此通過第二步, split和碎片都沒有變化.

  第三步,合併四個碎片成一個split, 長度爲150.

  若是要減小map數量, 能夠調大mapred.max.split.size, 不然調小便可.

  其特色是: 一個塊至多做爲一個map的輸入,一個文件可能有多個塊,一個文件可能由於塊多分給作爲不一樣map的輸入, 一個map可能處理多個塊,可能處理多個文件。

相關文章
相關標籤/搜索