InputFormat是一個抽象類,其定義以下:app
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException;
InputFormat會對數據進行兩方面的處理:ide
運行做業的客戶端經過調用getSplits()計算分片,而後將它們發送到application master,application master使用其存儲位置信息來調度map任務從而在集羣上處理這些分片數據。map任務把輸入分片傳給InputFormat的createRecordReader()方法來得到這個分片的RecordReader。RecordReader就像是記錄上的迭代器,map任務用一個RecordReader來生成記錄的鍵值對,而後在傳遞給map函數。Mapper的run方法以下:函數
public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
另外,輸入格式的類型也由具體的輸入格式進行設置。根據數據源的不一樣,InputFormat由幾個不一樣的子類:oop
類結構層次以下學習
這裏主要學習FileInputFormat。(關於split的介紹見split簡介.md)code
FileInputFormat,也是一個抽象類。是全部使用文件做爲其數據源的InputFormat實現的基類。FileInputFormat提供兩個功能:orm
FileInputFormat類中提供了四種靜態方法來設定Job的輸入路徑:blog
public static void addInputPath(Job job,Path path) throws IOException public static void addInputPaths(Job job, String commaSeparatedPaths) throws IOException public static void setInputPaths(Job job, Path... inputPaths) throws IOException public static void setInputPaths(Job job,String commaSeparatedPaths) throws IOException
其中addInputPath()和addInputPaths()方法能夠將一個或多個路徑加入路徑列表。setInputPaths()方法一次設定完整的路徑列表(會替換前面設置的全部輸入路徑)。繼承
一條路徑能夠表示一個文件、一個目錄或是一個glob(文件和目錄的集合)。路徑時目錄的話,表示要包含這個目錄下的全部文件。但當路徑爲一個目錄時,其內容不會被遞歸處理:若是目錄中包含子目錄,這些子目錄也會被看成文件進行處理,從而引起錯誤。要進行過濾,可使用setInputPathFilter方法加過濾。遞歸
FileInputFormat中實現了父類InputFormat的getSplits方法,用於將文件在邏輯上分片,獲取一個個的split。
FileInputFormat只分割大文件(超過HDFS塊大小)。分片一般與HDFS塊大小同樣,這個值能夠經過設置不一樣的Hadoop屬性來改變。計算公式爲:
max(minimumSize, min(maximumSize, blockSize))
minimumSize表示一個分片中最少的有效字節數,默認爲1。maximumSize表示一個分片中最大的有效字節數,默認爲Long.MAX_VALUE。blockSize是HDFS的塊大小,Hadoop2.0中默認是128M。所以,默認的分片大小就是128M。(split與dataBlock的區別見InputSplit.md)
FileInputFormat中並無具體實現父類中的createRecordReader方法,FileInputFormat只定義了數據的來源,以及如何將數據分片,可是對於如何解析分片,如何將split中的數據轉換爲一條條<Key,Value>形式的記錄,並無一個肯定的規則。即createRecordReader方法須要由它的子類來具體實現。經常使用的子類實現是TextInputFormat。
TextInputFormat是一個實體類,繼承自FileInputFormat,也是默認(沒有顯式設置的時候)的輸入格式。對於split的處理,它產生的鍵類型是LongWritable,存儲該行在整個文件中的偏移;值類型是Text,保存的是這行數據。這是經過在TextInputFormat內部建立了一個LineRecordReader實現的,以下:
@Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); }
TextInputFormat的鍵,即每一行在文件中的字節偏移量,一般並非特別有用。一般狀況下,文件中的每一行是一個鍵值對,並使用某個分界符進行分隔,好比製表符。例如由TextOutputFormat(Hadoop默認的OutputFormat)產生的輸出就是這種。要正確處理這類文件,使用KeyValueTextInputFormat比較合適。
能夠經過mapreduce.input.keyvaluelinerecordreader.key.value.spearator屬性來指定分隔符(默認是製表符)。
經過TextInputFormat和KeyValueTextInputFormat,每一個mapper收到的輸入行數不一樣(行的長度不一樣)。若是但願mapper收到固定行數的輸入,須要將NLineInputFormat做爲InputFormat使用。與TextInputFormat同樣,鍵是文件中行的字節偏移量,值是行自己。
本地寫的Markdown放上來仍是不方便,格式什麼的都要再調整一下,有時間得把這個整一下,或者本身搭一個?