咱們在MapReduce中TextInputFormat分片和讀取分片數據源碼級分析 這篇中以TextInputFormat爲例講解了InputFormat的分片過程以及RecordReader讀取分片數據的過程。接下來我們分析TableInputFormat的分片信息和數據讀取過程。html
TableInputFormat這是專門處理基於HBase的MapReduce的輸入數據的格式類。咱們能夠看看繼承結構:(1)public class TableInputFormat extends TableInputFormatBase implements Configurable;(2)public abstract class TableInputFormatBase extends InputFormat<ImmutableBytesWritable, Result>。其中InputFormat是輸入格式的基類。apache
TableInputFormat類主要是構造HTable對象和Scan對象,主要在方法setConf(Configuration configuration)構造,代碼以下:數組
1 /** 2 * Sets the configuration. This is used to set the details for the table to 3 * be scanned. 4 * 5 * @param configuration The configuration to set. 6 * @see org.apache.hadoop.conf.Configurable#setConf( 7 * org.apache.hadoop.conf.Configuration) 8 */ 9 @Override 10 public void setConf(Configuration configuration) { 11 this.conf = configuration; 12 String tableName = conf.get(INPUT_TABLE); 13 try { 14 setHTable(new HTable(new Configuration(conf), tableName)); 15 } catch (Exception e) { 16 LOG.error(StringUtils.stringifyException(e)); 17 } 18 19 Scan scan = null; 20 21 if (conf.get(SCAN) != null) { 22 try { 23 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); 24 } catch (IOException e) { 25 LOG.error("An error occurred.", e); 26 } 27 } else { 28 try { 29 scan = new Scan(); 30 31 if (conf.get(SCAN_ROW_START) != null) { 32 scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START))); 33 } 34 35 if (conf.get(SCAN_ROW_STOP) != null) { 36 scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP))); 37 } 38 39 if (conf.get(SCAN_COLUMNS) != null) { 40 addColumns(scan, conf.get(SCAN_COLUMNS)); 41 } 42 43 if (conf.get(SCAN_COLUMN_FAMILY) != null) { 44 scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY))); 45 } 46 47 if (conf.get(SCAN_TIMESTAMP) != null) { 48 scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); 49 } 50 51 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { 52 scan.setTimeRange( 53 Long.parseLong(conf.get(SCAN_TIMERANGE_START)), 54 Long.parseLong(conf.get(SCAN_TIMERANGE_END))); 55 } 56 57 if (conf.get(SCAN_MAXVERSIONS) != null) { 58 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); 59 } 60 61 if (conf.get(SCAN_CACHEDROWS) != null) { 62 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); 63 } 64 65 // false by default, full table scans generate too much BC churn 66 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); 67 } catch (Exception e) { 68 LOG.error(StringUtils.stringifyException(e)); 69 } 70 } 71 72 setScan(scan); 73 }
首先會經過配置信息獲取HBase表名,而後構造一個HTable對象;若是用戶本身的做業中有配置Scan的話,就會解碼Scan字符串轉換爲Scan對象;若是用戶沒配置Scan就會建立一個默認的Scan,進行一些基本配置。app
關於TableInputFormatBase,咱們重點仍是講兩個方法:RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, TaskAttemptContext context)方法和List<InputSplit> getSplits(JobContext context)方法,前者是讀取分片信息所指的數據供TableMapper處理,後者是構造HBase表的分片信息。這裏的分片信息是TableSplit extends InputSplit implements Writable, Comparable,這個TableSpli維護4個字段,HBase表名:byte [] tableName、scan起始rowkey:byte [] startRow、scan結束rowkey:byte [] endRow、以及該region所在節點:String regionLocation。ide
1、先看getSplits方法吧,這裏一個split通常對應一個完整region,除非用戶設定的開始和結束rowkey不是region的邊界,代碼以下:oop
1 /** 2 * Calculates the splits that will serve as input for the map tasks. The 3 * number of splits matches the number of regions in a table. 4 * 5 * @param context The current job context. 6 * @return The list of input splits. 7 * @throws IOException When creating the list of splits fails. 8 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( 9 * org.apache.hadoop.mapreduce.JobContext) 10 */ 11 @Override 12 public List<InputSplit> getSplits(JobContext context) throws IOException { 13 if (table == null) { 14 throw new IOException("No table was provided."); 15 } 16 // Get the name server address and the default value is null. 17 this.nameServer = 18 context.getConfiguration().get("hbase.nameserver.address", null); 19 20 Pair<byte[][], byte[][]> keys = table.getStartEndKeys();//獲取全部Region的開始rowkey和結束rowkey 21 if (keys == null || keys.getFirst() == null || 22 keys.getFirst().length == 0) { // 23 //table的第一個region的startKey必須是EMPTY_BYTE_ARRAY,不然輸出FIRST_REGION_STARTKEY_NOT_EMPTY信息 24 HRegionLocation regLoc = table.getRegionLocation( 25 HConstants.EMPTY_BYTE_ARRAY, false); 26 if (null == regLoc) { //一個region也沒有 27 throw new IOException("Expecting at least one region."); 28 } 29 List<InputSplit> splits = new ArrayList<InputSplit>(1); 30 //構造一個TableSplit,起始rowkey和結束rowkey都是EMPTY_BYTE_ARRAY 31 InputSplit split = new TableSplit(table.getTableName(), 32 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc 33 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0]); 34 splits.add(split); 35 return splits; //返回分片信息 36 } 37 List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length); 38 for (int i = 0; i < keys.getFirst().length; i++) { //有多個region 39 if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { //這個方法一直返回true 40 continue; 41 } 42 HServerAddress regionServerAddress = 43 table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); 44 InetAddress regionAddress = 45 regionServerAddress.getInetSocketAddress().getAddress();//獲取region所在地址 46 String regionLocation; 47 try { 48 regionLocation = reverseDNS(regionAddress);//將地址轉換爲字符串的主機名 49 } catch (NamingException e) { 50 LOG.error("Cannot resolve the host name for " + regionAddress + 51 " because of " + e); 52 regionLocation = regionServerAddress.getHostname(); 53 } 54 55 byte[] startRow = scan.getStartRow(); //獲取scan的開始rowkey 56 byte[] stopRow = scan.getStopRow(); //獲取scan的結束rowkey 57 // determine if the given start an stop key fall into the region 58 //比較用戶設定的rowkey範圍在那些region之中 59 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || 60 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && 61 (stopRow.length == 0 || 62 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { 63 byte[] splitStart = startRow.length == 0 || 64 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? 65 keys.getFirst()[i] : startRow; 66 byte[] splitStop = (stopRow.length == 0 || 67 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && 68 keys.getSecond()[i].length > 0 ? 69 keys.getSecond()[i] : stopRow; 70 InputSplit split = new TableSplit(table.getTableName(), 71 splitStart, splitStop, regionLocation); //構造TableSplit 72 splits.add(split); 73 if (LOG.isDebugEnabled()) { 74 LOG.debug("getSplits: split -> " + i + " -> " + split); 75 } 76 } 77 } 78 return splits; 79 }
Pair這個結構由兩部分組成:start和end,都是byte[][],這是一個二維byte數組,其中一維是region的順序,二維是start或者end的字符序列。經過getFirst方法就能夠獲取全部region的start rowkey,經過getSecond能夠得到全部的end rowkey。ui
這裏還有一個要注意的就是每一個HBase表的第一個region是沒有start rowkey,最後一個region是沒有end rowkey,這裏的「沒有」是指的是table.getStartEndKeys()這個方法獲取的結果,另外大夥也能夠從WEB UI中查看指定的HBase表的region信息也能夠看到第一個region的start和最後一個region的end並無顯示。還有就是若是這個HBase表只有一個region的話,getFirst方法返回是沒有數據的;getSecond也沒數據。this
(1)、getSplits方法中的第一個if語句段是上面說的只有一個region的狀況。每一個HBase表的第一個region的start rowkey都是EMPTY_BYTE_ARRAY,這是一個長度爲0的byte數組。table.getRegionLocation方法會找指定的rowkey所在的region所在信息HRegionLocation。若是隻有一個region的話,就先找EMPTY_BYTE_ARRAY所在region信息,若是沒有這樣的信息就是出現了錯誤;若是若是有這樣的信息的話,就構建一個長度爲1的InputSplit列表splits,構造一個TableSplit:設定HBase表名,scan起始和結束rowkey都是EMPTY_BYTE_ARRAY,再加上region所在節點,把這個TableSplit加入splits,返回這個splits。spa
(2)、接下來就是HBase表有多個region的狀況,構建長度爲keys.getFirst().length的InputSplit列表,而後遍歷keys.getFirst()獲取每一個region的位置信息並將其轉換成String類型(reverseDNS方法從reverseDNSCacheMap中獲取,reverseDNSCacheMap是存儲IPAddress => HostName的映射);而後獲取用戶設定的起始和結束rowkey,並和當前的region的起始和比較結束,若是有rowkey包含這個region就會將這region當作一個InputSplit放入列表中,最後待遍歷完以後返回split列表。判斷當前region是否應該加入InputSplit列表的條件就是循環中的最後一個if語句段,條件是:((startRow.length == 0 || keys.getSecond()[i].length == 0 ||Bytes.compareTo (startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)),分解這個條件爲兩部分:A、(startRow.length == 0 || keys.getSecond()[i].length == 0 ||Bytes.compareTo (startRow, keys.getSecond()[i]) < 0)這個是要肯定用戶設定的start rowkey是否小於當前region的結束rowkey;B、(stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)這個是要肯定用戶設定的end rowkey是否大於當前region的開始rowkey,這倆條件必須同時知足才能夠,一旦知足就要肯定這個TableSplit的開始rowkey和結束rowkey了:A、startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0若是爲true的話,說明用戶設定的起始rowkey多是從表的開頭開始或者是當前region的起始rowkey大於用戶設定的則應該將當前region的起始rowkey做爲TableSplit的起始rowkey,若是表達式爲false的話將用戶設定的起始rowkey做爲TableSplit的起始rowkey;B、(stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0若是爲true的話,說明首先要肯定是否設定的結束rowkey或者當前region的結束rowkey小於用戶設定的結束rowkey,且要保證當前region不是最後一個(keys.getSecond()[i].length > 0),這樣的的TableSplit的結束rowkey就是當前region的結束rowkey,若是爲false則將用戶設定的結束rowkey爲TableSplit的結束rowkey,爲何要不是最後一個region呢?由於最後一個region的end rowkey的長度始終爲0,比較之下會將最後一個region的end rowkey設置給TableSplit,顯然這是不對,能到這裏說明這個region應該被分配給一個TableSplit,若是是最後一個region的話,那麼這個TableSplit的結束rowkey應該是用戶設定的而非這個region本身的。得到這個tableSplit的開始和結束rowkey以後就能夠封裝這個TableSplit了,並放入InputSplit列表。最終待全部的region遍歷結束以後返回這個InputSplit列表。debug
這樣getSplits方法就結束了。
2、createRecordReader方法,這個方法代碼以下:
1 /** 2 * Builds a TableRecordReader. If no TableRecordReader was provided, uses 3 * the default. 4 * 5 * @param split The split to work with. 6 * @param context The current context. 7 * @return The newly created record reader. 8 * @throws IOException When creating the reader fails. 9 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( 10 * org.apache.hadoop.mapreduce.InputSplit, 11 * org.apache.hadoop.mapreduce.TaskAttemptContext) 12 */ 13 @Override 14 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( 15 InputSplit split, TaskAttemptContext context) 16 throws IOException { 17 if (table == null) { 18 throw new IOException("Cannot create a record reader because of a" + 19 " previous error. Please look at the previous logs lines from" + 20 " the task's full log for more details."); 21 } 22 TableSplit tSplit = (TableSplit) split; 23 TableRecordReader trr = this.tableRecordReader; 24 // if no table record reader was provided use default 25 if (trr == null) { 26 trr = new TableRecordReader(); 27 } 28 Scan sc = new Scan(this.scan); 29 sc.setStartRow(tSplit.getStartRow()); 30 sc.setStopRow(tSplit.getEndRow()); 31 trr.setScan(sc); 32 trr.setHTable(table); 33 try { 34 trr.initialize(tSplit, context); 35 } catch (InterruptedException e) { 36 throw new InterruptedIOException(e.getMessage()); 37 } 38 return trr; 39 }
這個方法主要是獲取TableSplit,而後構造一個Scan,設定開始和結束rowkey;設定HTablePool;將Scan和HTable傳遞給一個TableRecordReader對象,而後調用initialize(tSplit, context)初始化,最後返回這個TableRecordReader。能夠看出TableRecordReader這個是讀取key/value的。TableRecordReader中實際操做數據的是TableRecordReaderImpl,TableRecordReader的nextKeyValue()、getCurrentValue()、initialize、getCurrentKey()方法會調用TableRecordReaderImpl的相應方法。
TableRecordReaderImpl的initialize方法主要是從新建立一個新的Scan,並將createRecordReader傳過來的賦值給這個新的currentScan,並獲取對應的ResultScanner。
TableRecordReaderImpl的nextKeyValue()會先建立一個key = new ImmutableBytesWritable()和value = new Result(),這就是咱們繼承TableMapper中map方法中的參數類型,而後每次調用該方法經過執行value = this.scanner.next()方法來獲取HBase中的一行數據賦值給value,這裏代表若是scanner.next()運行無異常的話key中是沒有數據的(出現異常以後會存儲value對應行的rowkey),只有value有數據。執行了這個方法就能夠經過getCurrentValue()、getCurrentKey()方法來獲取value和key了。
上面這些講解了在HBase上使用MapReduce時的分片過程及如何讀取這些分片上的數據的。