MapReduce中TextInputFormat分片和讀取分片數據源碼級分析

  InputFormat主要用於描述輸入數據的格式(咱們只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供如下兩個功能:html

  (1)數據切分:按照某個策略將輸入數據切分紅若干個split,以便肯定MapTask個數以及對應的split;apache

  (2)爲Mapper提供輸入數據:讀取給定的split的數據,解析成一個個的key/value對,供mapper使用。數組

  InputFormat有兩個比較重要的方法:(1)List<InputSplit> getSplits(JobContext job);(2)RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context)。這兩個方法分別對應上面的兩個功能。緩存

  InputSplit分片信息有兩個特色:(1)是邏輯分片,只是在邏輯上對數據進行分片,並不進行物理切分,這點和block是不一樣的,只記錄一些元信息,好比起始位置、長度以及所在的節點列表等;(2)必須可序列化,分片信息要上傳到HDFS文件,還會被JobTracker讀取,序列化能夠方便進程通訊以及永久存儲。架構

  RecordReader對象能夠將輸入數據,即InputSplit對應的數據解析成衆多的key/value,會做爲MapTask的map方法的輸入。app

  咱們本節就以最長使用的TextInputFormat爲列來說解分片和讀取分片數據。ide

  先看繼承關係:(1)public class TextInputFormat extends FileInputFormat;(2)public abstract class FileInputFormat<K, V> extends InputFormat;(3)public abstract class InputFormat。最頂的父類InputFormat只有兩個未實現的抽象方法getSplits和createRecordReader;而FileInputFormat包含的方法比較多,以下圖:oop

,咱們在本身的MR程序中設置輸入目錄就是調用這裏的方法;TextInputFormat這個類只有倆個方法,代碼以下:this

 1 /** An {@link InputFormat} for plain text files.  Files are broken into lines.
 2  * Either linefeed or carriage-return are used to signal end of line.  Keys are
 3  * the position in the file, and values are the line of text.. */
 4 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
 5 
 6   @Override
 7   public RecordReader<LongWritable, Text> 
 8     createRecordReader(InputSplit split,
 9                        TaskAttemptContext context) {
10     return new LineRecordReader();
11   }
12 
13   @Override
14   protected boolean isSplitable(JobContext context, Path file) {
15     CompressionCodec codec = 
16       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
17     return codec == null;
18   }
19 
20 }

  isSplitable方法就是是否要切分文件,這個方法顯示若是是壓縮文件就不切分,非壓縮文件就切分。spa

  接下來,咱們只關注上面說的那兩個主要方法,首先來看:

  1、getSplits方法,這個方法在FileInputFormat類中,它的子類通常只須要實現TextInputFormat中的兩個方法而已,getSplits方法代碼以下: 

 1  /** 
 2    * Generate the list of files and make them into FileSplits.
 3    */ 
 4   public List<InputSplit> getSplits(JobContext job
 5                                     ) throws IOException {
 6     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 7     long maxSize = getMaxSplitSize(job);    //Long.MAX_VALUE
 8 
 9     // generate splits
10     List<InputSplit> splits = new ArrayList<InputSplit>();
11     List<FileStatus>files = listStatus(job);
12     for (FileStatus file: files) {
13       Path path = file.getPath();
14       FileSystem fs = path.getFileSystem(job.getConfiguration());
15       long length = file.getLen();    //整個文件的長度
16       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
17       if ((length != 0) && isSplitable(job, path)) {    //默認是true,可是若是是壓縮的,則是false
18         long blockSize = file.getBlockSize();        //64M,67108864B
19         long splitSize = computeSplitSize(blockSize, minSize, maxSize);    //計算split大小  Math.max(minSize, Math.min(maxSize, blockSize))
20 
21         long bytesRemaining = length;            
22         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
23           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
24           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
25                                    blkLocations[blkIndex].getHosts()));        //hosts是主機名,name是IP
26           bytesRemaining -= splitSize;        //剩餘塊的大小
27         }
28         
29         if (bytesRemaining != 0) {    //最後一個
30           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
31                      blkLocations[blkLocations.length-1].getHosts()));
32         }
33       } else if (length != 0) {    //isSplitable(job, path)等於false
34         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
35       } else {
36         //Create empty hosts array for zero length files
37         splits.add(new FileSplit(path, 0, length, new String[0]));
38       }
39     }
40     
41     // Save the number of input files in the job-conf
42     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
43 
44     LOG.debug("Total # of splits: " + splits.size());
45     return splits;
46   }
View Code

  (1)minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)):getFormatMinSplitSize()=1,getMinSplitSize(job)獲取"mapred.min.split.size"指定的大小,默認是1;

  (2)maxSize = getMaxSplitSize(job):getMaxSplitSize(job)獲取"mapred.max.split.size",默認是Long.MAX_VALUE,Long類型的最大值;

  (3)返回輸入目錄下全部文件的FileStatus信息列表;

  (4)而後對每個文件獲取它的目錄、文件的長度、文件對應的全部塊的信息(可能有多個塊,每一個塊對應3個副本);

  (5)而後若是文件長度不爲0且支持分割(isSplitable方法等於true):獲取block大小,默認是64MB,經過方法computeSplitSize(blockSize, minSize, maxSize)計算分片大小splitSize,這個方法Math.max(minSize, Math.min(maxSize, blockSize));而後將bytesRemaining(剩餘未分片字節數)設置爲整個文件的長度,A、若是bytesRemaining超過度片大小splitSize必定量纔會將文件分紅多個InputSplit:bytesRemaining)/splitSize > SPLIT_SLOP(默認1.1),B、就會執行getBlockIndex(blkLocations, length-bytesRemaining)獲取block的索引,第二個參數是這個block在整個文件中的偏移量,在循環中會從0愈來愈大,該方法代碼以下:

 1  protected int getBlockIndex(BlockLocation[] blkLocations, 
 2                               long offset) {
 3     for (int i = 0 ; i < blkLocations.length; i++) {
 4       // is the offset inside this block?
 5       if ((blkLocations[i].getOffset() <= offset) &&
 6           (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
 7         return i;
 8       }
 9     }
10     BlockLocation last = blkLocations[blkLocations.length -1];
11     long fileLength = last.getOffset() + last.getLength() -1;
12     throw new IllegalArgumentException("Offset " + offset + 
13                                        " is outside of file (0.." +
14                                        fileLength + ")");
15   }
View Code

  這個方法中的if語句的條件會限制獲取到,這個偏移量對應的block的索引;C、將這個索引對應的block信息的主機節點以及文件的路徑名、開始的便宜量、分片大小splitSize封裝到一個InputSplit中加入List<InputSplit> splits;D、bytesRemaining -= splitSize修改剩餘字節大小;E、返回A中繼續判斷,若是知足即系走BCD,不然跳出循環。若是剩餘bytesRemaining還不爲0,表示還有未分配的數據,將剩餘的數據及最後一個block加入splits。

  (6)若是不容許分割isSplitable==false,則將第一個block、文件目錄、開始位置爲0,長度爲整個文件的長度封裝到一個InputSplit,加入splits中;

  (7)若是文件的長度==0,則splits.add(new FileSplit(path, 0, length, new String[0]))沒有block,而且初始和長度都爲0;

  (8)將輸入目錄下文件的個數賦值給 "mapreduce.input.num.files",方便之後校對;

  (9)返回分片信息splits。

  這就是getSplits獲取分片的過程。當使用基於FileInputFormat實現InputFormat時,爲了提升MapTask的數據本地性,應儘可能使InputSplit大小與block大小相同。

   特殊問題:就是若是分片大小超過bolck大小,可是InputSplit中的封裝了單個block的所在主機信息啊,這樣能讀取多個bolck數據嗎?這個問題咱們留到最後講解。

  2、createRecordReader方法,該方法返回一個RecordReader對象,實現了相似的迭代器功能,將某個InputSplit解析成一個個key/value對。RecordReader應該注意兩點:

  A、定位記錄邊界:爲了能識別一條完整的記錄,應該添加一些同步標示,TextInputFormat的標示是換行符;SequenceFileInputFormat的標示是每隔若干條記錄會添加固定長度的同步字符串。爲了解決InputSplit中第一條或者最後一條可能誇InputSplit的狀況,RecordReader規定每一個InputSplit的第一條不完整記錄劃給前一個InputSplit。

  B、解析key/value:將每一個記錄分解成key和value兩部分,TextInputFormat每一行的內容是value,該行在整個文件中的偏移量爲key;SequenceFileInputFormat的記錄共有四個字段組成:前兩個字段分別是整個記錄的長度和key的長度,均爲4字節,後兩個字段分別是key和value的內容。

  TextInputFormat使用的RecordReader是org.apache.hadoop.mapreduce.lib.input.LineRecordReader。咱們在MapReduce的MapTask任務的運行源碼級分析這篇文章中有介紹過LineRecordReader,initialize方法主要是獲取分片信息的初始位置和結束位置,以及輸入流(如有壓縮則是壓縮流);mapper的key/value是經過LineRecordReader.nextKeyValue()方法將key和value讀取到key和value中的,在這個方法中key被設置爲在文件中的偏移量,value經過LineReader.readLine(value, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength))這個方法會讀取一行數據放入value之中,方法代碼以下:

 1  /**
 2    * Read one line from the InputStream into the given Text.  A line
 3    * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
 4    * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated
 5    * line.
 6    *
 7    * @param str the object to store the given line (without newline)
 8    * @param maxLineLength the maximum number of bytes to store into str;
 9    *  the rest of the line is silently discarded.
10    * @param maxBytesToConsume the maximum number of bytes to consume
11    *  in this call.  This is only a hint, because if the line cross
12    *  this threshold, we allow it to happen.  It can overshoot
13    *  potentially by as much as one buffer length.
14    *
15    * @return the number of bytes read including the (longest) newline
16    * found.
17    *
18    * @throws IOException if the underlying stream throws
19    */
20   public int readLine(Text str, int maxLineLength,
21                       int maxBytesToConsume) throws IOException {
22     /* We're reading data from in, but the head of the stream may be
23      * already buffered in buffer, so we have several cases:
24      * 1. No newline characters are in the buffer, so we need to copy
25      *    everything and read another buffer from the stream.
26      * 2. An unambiguously terminated line is in buffer, so we just
27      *    copy to str.
28      * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
29      *    in CR.  In this case we copy everything up to CR to str, but
30      *    we also need to see what follows CR: if it's LF, then we
31      *    need consume LF as well, so next call to readLine will read
32      *    from after that.
33      * We use a flag prevCharCR to signal if previous character was CR
34      * and, if it happens to be at the end of the buffer, delay
35      * consuming it until we have a chance to look at the char that
36      * follows.
37      */
38     str.clear();
39     int txtLength = 0; //tracks str.getLength(), as an optimization
40     int newlineLength = 0; //length of terminating newline
41     boolean prevCharCR = false; //true of prev char was CR
42     long bytesConsumed = 0;
43     do {
44       int startPosn = bufferPosn; //starting from where we left off the last time
45       if (bufferPosn >= bufferLength) {
46         startPosn = bufferPosn = 0;
47         if (prevCharCR)
48           ++bytesConsumed; //account for CR from previous read
49         bufferLength = in.read(buffer);    //從輸入流中讀取必定數量的字節,並將其存儲在緩衝區數組 b 中。以整數形式返回實際讀取的字節數。
50         if (bufferLength <= 0)        //結束了,沒數據了
51           break; // EOF
52       }
53       //'\n',ASCII碼:10,意義:換行NL;;;;'\r' ,ASCII碼:13,意義: 回車CR
54       for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
55         if (buffer[bufferPosn] == LF) {            //若是是換行字符\n
56           newlineLength = (prevCharCR) ? 2 : 1;
57           ++bufferPosn; // at next invocation proceed from following byte,越過換行字符
58           break;
59         }
60         if (prevCharCR) { //CR + notLF, we are at notLF,若是是回車字符\r
61           newlineLength = 1;
62           break;
63         }
64         prevCharCR = (buffer[bufferPosn] == CR);
65       }
66       int readLength = bufferPosn - startPosn;
67       if (prevCharCR && newlineLength == 0)    //表示還沒遇到換行,有回車字符,且緩存最後一個是\r
68         --readLength; //CR at the end of the buffer
69       bytesConsumed += readLength;
70       int appendLength = readLength - newlineLength;    //newlineLength換行符個數
71       if (appendLength > maxLineLength - txtLength) {
72         appendLength = maxLineLength - txtLength;
73       }
74       if (appendLength > 0) {
75         str.append(buffer, startPosn, appendLength);    //將數據加入str
76         txtLength += appendLength;
77       }
78     } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);//循環條件沒有換行而且沒超過上限
79 
80     if (bytesConsumed > (long)Integer.MAX_VALUE)
81       throw new IOException("Too many bytes before newline: " + bytesConsumed);    
82     return (int)bytesConsumed;
83   }
View Code

  這個方法目的就是讀取一行記錄寫入str中。bytesConsumed記錄這讀取的字節總數;bufferLength = in.read(buffer)從輸入流讀取bufferLength字節的數據放入buffer中;do-while中開始部分的if語句是要保證將bufferLength個字節數據處理完畢以後再從輸入流中讀取下一批數據;newlineLength表示換行的標記符長度(0,1,2三種值),由於不一樣的系統換行標記可能不一樣,有三種:\r(回車符)、\n(換行符)、\r\n(\n:  UNIX 系統行末結束符;\r\n: window 系統行末結束符;\r:  MAC OS 系統行末結束符);for循環會挨個檢查字符是不是\r或者\n,若是是回車符\r,還會將prevCharCR設置爲true,當前字符若是是換行符\n,prevCharCR==true時(表示上一個字符是回車符\r)則newlineLength=2(這代表當前系統的換行標記是\r\n),prevCharCR==false時(表示上一個字符不是回車符\r)則newlineLength=1(這代表當前系統的換行標記是\n),並退出for循環;若是當前字符不是換行符\n且prevCharCR==true(代表當前系統的換行標記是\r)則newlineLength = 1並退出for循環;這樣就找到了換行的標記,而後計算數據的長度appendLength(不包括換行符),將buffer中指定位置開始長度爲appendLength的數據追加到str(這裏實際上是value)中;txtLength表示的是str(這裏實際上是value中值的長度);do-while循環的條件是:一、沒有發現換行標記newlineLength == 0;二、讀取的字節數量沒有超過上限bytesConsumed < maxBytesToConsume,這倆條件要同時知足。這其中有個問題就是當前系統的換行標記是\r\n,可是這兩個字符沒有同時出如今此次讀取的數據之中,\n在下一個批次之中,這不要緊,上面的for循環會檢查\r出現以後的下一個字符是不是\n再對newlineLength進行設置的。從這個方法能夠看出,即便是記錄跨split、跨block也不能阻止它完整讀取一行數據的決心啊。

  再返回查看LineRecordReader.nextKeyValue()方法,這個方法代碼以下:  

 1 public boolean nextKeyValue() throws IOException {
 2     if (key == null) {
 3       key = new LongWritable();
 4     }
 5     key.set(pos);
 6     if (value == null) {
 7       value = new Text();
 8     }
 9     int newSize = 0;
10     while (pos < end) {
11       newSize = in.readLine(value, maxLineLength,
12                             Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
13                                      maxLineLength));
14       if (newSize == 0) {
15         break;
16       }
17       pos += newSize;
18       if (newSize < maxLineLength) {
19         break;
20       }
21 
22       // line too long. try again
23       LOG.info("Skipped line of size " + newSize + " at pos " + 
24                (pos - newSize));
25     }
26     if (newSize == 0) {
27       key = null;
28       value = null;
29       return false;
30     } else {
31       return true;
32     }
33   }
View Code

  這個方法會控制split的讀取數據的結束位置,上面的readLine方法只關注輸入流不會管split的大小的。須要注意的是其中的while循環,其中的pos和end表示當前在文件中的偏移量和split的結束位置,即便這個split的最後一行跨split也會完整的獲取一行。也就保證了一個記錄的完整性。mapper獲取key/value會經過調用getCurrentKey()和getCurrentValue()來達到的,可是調用這倆方法前得先調用nextKeyValue()方法才能實現key和value的賦值。

  

  到這咱們回頭看看上面的那個特殊問題,就是split的大小超過block的大小數據讀取的問題,咱們前面已經講過split是邏輯分片,不是物理分片,當MapTask的數據本地性發揮做用時,會從本機的block開始讀取,超過這個block的部分可能還在本機也可能不在本機,若是是後者的話就要從別的節點拉數據過來,由於實際獲取數據是一個輸入流,這個輸入流面向的是整個文件,不受什麼block啊、split的影響,split的大小越大可能須要從別的節點拉的數據越多,從從而效率也會越慢,拉數據的多少是由getSplits方法中的splitSize決定的。因此爲了更有效率,應該遵循上面的黑體字。

 

  至此,TextInputFormat的分片和數據讀取過程講完了。這只是一個例子,其餘InputFormat能夠參考這個。

 

  參考:一、董西成,《hadoop技術內幕---深刻理解MapReduce架構設計與實現原理》

       二、http://www.cnblogs.com/clarkchen/archive/2011/06/02/2068609.html

相關文章
相關標籤/搜索