hadoop中InputFormat 接口的設計與實現

InputFormat 主要用於描述輸入數據的格式, 它提供如下兩個功能。
❑數據切分:按照某個策略將輸入數據切分紅若干個 split, 以便肯定 Map Task 個數以及對應的 split。
❑爲 Mapper 提供輸入數據: 給定某個 split, 能將其解析成一個個 key/value 對。
本文將介紹 Hadoop 如何設計 InputFormat 接口,以及提供了哪些經常使用的 InputFormat實現。java

1 .舊版 API 的 InputFormat 解析

如圖所示:node

在舊版 API 中, InputFormat 是一個接口 , 它包含兩種方法:算法

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;

getSplits 方法主要完成數據切分的功能, 它會嘗試着將輸入數據切分紅 numSplits 個InputSplit。 InputSplit 有如下兩個特色。
邏輯分片它只是在邏輯上對輸入數據進行分片, 並不會在磁盤上將其切分紅分片進行存儲。 InputSplit 只記錄了分片的元數據信息,好比起始位置、長度以及所在的
節點列表等。
可序列化:在 Hadoop 中,對象序列化主要有兩個做用:進程間通訊和永久存儲。 此處,InputSplit 支持序列化操做主要是爲了進程間通訊。 做業被提交到 JobTracker 以前,Client 會調用做業 InputFormat 中的 getSplits 函數, 並將獲得的 InputSplit 序列化到文件中。這樣,看成業提交到 JobTracker 端對做業初始化時,可直接讀取該文件,解析出全部 InputSplit, 並建立對應的 MapTask。數據庫

getRecordReader 方法返回一個RecordReader 對象,該對象可將輸入的 InputSplit解析成若干個 key/value 對。 MapReduce 框架在 MapTask 執行過程當中,會不斷調用RecordReader 對象中的方法, 迭代獲取 key/value 對並交給 map() 函數處理, 主要代碼(通過簡化)以下:網絡

//調用 InputSplit 的 getRecordReader 方法獲取 RecordReader<K1, V1> input
……
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
//調用用戶編寫的 map() 函數
}
input.close();

前面分析了 InputFormat 接口的定義, 接下來介紹系統自帶的各類 InputFormat 實現。爲了方便用戶編寫 MapReduce 程序, Hadoop 自帶了一些針對數據庫和文件的 InputFormat實現, 具體如圖所示。一般而言用戶須要處理的數據均以文件形式存儲到 HDFS 上,因此這裏重點針對文件的 InputFormat 實現進行討論。

如圖所示, 全部基於文件的 InputFormat 實現的基類是 FileInputFormat, 並由此派生出針對文本文件格式的 TextInputFormat、 KeyValueTextInputFormat 和 NLineInputFormat,針對二進制文件格式的 SequenceFileInputFormat 等。 整個基於文件的 InputFormat 體系的設計思路是,由公共基類FileInputFormat 採用統一的方法 對各類輸入文件進行切分,好比按照某個固定大小等分,而由各個派生 InputFormat 本身提供機制將進一步解析InputSplit。 對應到具體的實現是,基類 FileInputFormat 提供 getSplits 實現, 而派生類提供getRecordReader 實現。架構

爲了深刻理解這些 InputFormat 的實現原理, 選取extInputFormat 與SequenceFileInputFormat 進行重點介紹。app

首先介紹基類FileInputFormat的實現。它最重要的功能是爲各類 InputFormat 提供統一的getSplits 函數。該函數實現中最核心的兩個算法是文件切分算法和 host 選擇算法。
(1) 文件切分算法
文件切分算法主要用於肯定 InputSplit 的個數以及每一個 InputSplit 對應的數據段。FileInputFormat 以文件爲單位切分生成 InputSplit。 對於每一個文件, 由如下三個屬性值肯定其對應的 InputSplit 的個數。
❑goalSize : 它是根據用戶指望的 InputSplit 數目計算出來的, 即 totalSize/numSplits。其中, totalSize 爲文件總大小; numSplits 爲用戶設定的 MapTask 個數, 默認狀況下是 1。
❑minSize: InputSplit 的最小值, 由配置參數 mapred.min.split.size 肯定, 默認是 1。
❑blockSize: 文件在 HDFS 中存儲的 block 大小, 不一樣文件可能不一樣, 默認是 64 MB。這三個參數共同決定 InputSplit 的最終大小, 計算方法以下:
splitSize = max{minSize, min{goalSize, blockSize}}
一旦肯定 splitSize 值後, FileInputFormat 將文件依次切成大小爲 splitSize 的 InputSplit,最後剩下不足 splitSize 的數據塊單獨成爲一個 InputSplit。
【實例】 輸入目錄下有三個文件 file一、file2 和 file3,大小依次爲 1 MB,32 MB 和250 MB。 若 blockSize 採用 默認值 64 MB, 則不一樣 minSize 和 goalSize 下, file3 切分結果如表所示(三種狀況下, file1 與 file2 切分結果相同, 均爲 1 個 InputSplit)。
表-minSize、 goalSize、 splitSize 與 InputSplit 對應關係框架

minSize  goalSize  splitSize  file3 對應的 InputSplit 數目  輸入目 錄對應的 InputSplit 總數
1 MB  totalSize
(numSplits=1 )
64 MB  4 6
32 MB  totalSize/5 50 MB 5 7
128 MB  totalSize/2 128 MB 2 4

結合表和公式能夠知道, 若是想讓 InputSplit 尺寸大於 block 尺寸, 則直接增大配置參數 mapred.min.split.size 便可。
(2) host 選擇算法
待 InputSplit 切分方案肯定後,下一步要肯定每一個 InputSplit 的元數據信息。 這一般由四部分組成:<file, start, length, hosts>, 分別表示 InputSplit 所在的文件、起始位置、長度以及所在的 host(節點)列表。 其中,前三項很容易肯定,難點在於 host 列表的選擇方法。函數

InputSplit 的 host 列表選擇策略直接影響到運行過程當中的任務本地性。 HDFS 上的文件是以 block 爲單位組織的,一個大文件對應的block 可能遍及整個 Hadoop 集羣, 而 InputSplit 的劃分算法可能致使一個 InputSplit 對應多個 block , 這些 block 可能位於不一樣節點上, 這使得 Hadoop 不可能實現徹底的數據本地性。爲此,Hadoop 將數據本地性按照代價劃分紅三個等級:node locality、rack locality 和 datacenter locality(Hadoop 還未實現該 locality 級別)。在進行任務調度時, 會依次考慮這 3 個節點的 locality, 即優先讓空閒資源處理本節點上的數據,若是節點上沒有可處理的數據,則處理同一個機架上的數據, 最差狀況是處理其餘機架上的數據(可是必須位於同一個數
據中心)。
雖然 InputSplit 對應的 block 可能位於多個節點上, 但考慮到任務調度的效率,一般不會把全部節點加到 InputSplit 的 host 列表中,而是選擇包含(該 InputSplit)數據總量最大的前幾個節點(Hadoop 限制最多選擇 10 個,多餘的會過濾掉),以做爲任務調度時判斷任務是否具備本地性的主要憑證。爲此,FileInputFormat 設計了一個簡單有效的啓發式算法 :首先按照 rack 包含的數據量對 rack 進行排序, 而後在 rack 內部按照每一個 node 包含的數據量對 node 排序, 最後取前 N個node 的 host 做爲InputSplit 的 host 列表, 這裏的 N爲 block副本數。這樣,當任務調度器調度 Task 時,只要將 Task 調度給位於 host 列表的節點,就認爲該 Task 知足本地性。oop

【實例】某個 Hadoop 集羣的網絡拓撲結構如圖所示, HDFS中block 副本數爲3,某個InputSplit 包含 3 個 block,大小依次是100、150 和 75,很容易計算,4 個rack 包
含的(該 InputSplit 的)數據量分別是17五、250、150 和 75。rack2 中的 node3 和 node4,rack1 中的 node1 將被添加到該 InputSplit 的 host 列表中。

從以上 host 選擇算法可知, 當 InputSplit 尺寸大於 block 尺寸時, Map Task 並不能實現徹底數據本地性, 也就是說, 總有一部分數據須要從遠程節點上讀取, 於是能夠得出如下結論:

當使用基於 FileInputFormat 實現 InputFormat 時, 爲了提升 Map Task 的數據本地性,應儘可能使 InputSplit 大小與 block 大小相同。
分析完 FileInputFormat 實現方法, 接下來分析派生類 TextInputFormat 與 SequenceFileInputFormat 的實現。前面提到, 由派生類實現 getRecordReader 函數, 該函數返回一個 RecordReader 對象。它實現了相似於迭代器的功能, 將某個 InputSplit 解析成一個個 key/value 對。在具體實現時, RecordReader 應考慮如下兩點:

❑定位記錄邊界:爲了可以識別一條完整的記錄,記錄之間應該添加一些同步標識。對於 TextInputFormat, 每兩條記錄之間存在換行符;對於 SequenceFileInputFormat,每隔若干條記錄會添加固定長度的同步字符串。 經過換行符或者同步字符串, 它們很容易定位到一個完整記錄的起始位置。另外,因爲FileInputFormat 僅僅按照數據量多少對文件進行切分, 於是 InputSplit 的第一條記錄和最後一條記錄可能會被從中間切開。 爲了解決這種記錄跨越 InputSplit 的讀取問 題, RecordReader 規定每一個InputSplit 的第一條不完整記錄劃給前一個 InputSplit 處理。

❑解析 key/value:定位到一條新的記錄後, 需將該記錄分解成 key 和 value 兩部分。對於TextInputFormat, 每一行的內容即爲 value,而該行在整個文件中的偏移量爲key。對於 SequenceFileInputFormat, 每條記錄的格式爲:
[record length] [key length] [key] [value]
其中, 前兩個字段分別是整條記錄的長度和 key 的長度, 均爲 4 字節, 後兩個字段分別是 key 和 value 的內容。 知道每條記錄的格式後, 很容易解析出 key 和 value。

2. 新版 API 的 InputFormat 解析

新版API的InputFormat 類圖如圖所示。新 API 與舊 API 比較,在形式上發生了較大變化,但仔細分析,發現僅僅是對以前的一些類進行了封裝。 正如前面介紹的那樣,經過封裝,使接口的易用性和擴展性得以加強。


public abstract class InputFormat<K, V> {
      public abstract 
        List<InputSplit> getSplits(JobContext context
                                   ) throws IOException, InterruptedException;
      public abstract 
        RecordReader<K,V> createRecordReader(InputSplit split,
                                             TaskAttemptContext context
                                            ) throws IOException, 
                                                     InterruptedException;
}

查看InputSplit.java文件源代碼:

public abstract class InputSplit {
  /**
   * 獲取split的大小, 這樣就能將輸入的splits按照大小排序.
   * @return split的字節大小
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract long getLength() throws IOException, InterruptedException;
  /**
   * 經過name獲取那些及將定位的nodes列表,其中的數據爲split準備
   * 位置沒必要序列化
   * @return a new array of the node nodes.
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
    String[] getLocations() throws IOException, InterruptedException;
}

此外, 對於基類 FileInputFormat, 新版 API 中有一個值得注意的改動 : InputSplit 劃分算法再也不考慮用戶設定的 Map Task 個數, 而用 mapred.max.split.size( 記爲 maxSize) 代替,即 InputSplit 大小的計算公式變爲:
splitSize = max{minSize, min{maxSize, blockSize}}

參考資料

《Hadoop技術內幕 深刻理解MapReduce架構設計與實現原理》

相關文章
相關標籤/搜索