經過spark獲取hbase數據的過程當中,遇到了InputFormat。文章主要圍繞InputFormat介紹。會牽扯到spark,mapreduce,hbase相關內容java
InputFormat是mapreduce提供的數據源格式接口,也就是說,經過該接口能夠支持讀取各類各樣的數據源(文件系統,數據庫等),從而進行mapreduce計算。數據庫
在有這個概念的基礎上分析InputFormat的源碼。apache
public abstract class InputFormat<K, V> { /* * 獲取數據的分區信息,每一個分區包裝成InputSplit,返回一個List<InputSplit> * 注意這裏的分區是邏輯分區 * 好比一個文件,一共有100個字符,假如安裝每一個分區10個字符,那麼一共有10個分區 */ public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; /* * 根據分區信息,獲取RecordReader,RecordReader其實就是一個增強版的迭代器,只不過返回的是kv格式的數據 * 能夠看到,這裏只有一個InputSplit,也就是隻有一個分區,也就是說是分區內部的迭代 */ public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
這樣大概就理解了這個接口的定位,一個是how to defined partition,一個是how to get data from partition,下面再實例化到spark的應用場景。ide
Spark篇oop
經過spark的mapreduce接口取hbase數據必定會用到下面的代碼this
//hbaseConfig HBaseConfiguration //TableInputFormat InputFormat的子類 表示輸入數據源 //ImmutableBytesWritable 數據源的key //Result 數據源的value //若是寫過mapreduce任務,這個方法和mapreduce的啓動配置相似,只不過輸出都是rdd,因此就不用聲明瞭 val hBaseRDD = sc.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
那這個是怎麼個流程呢spa
首先,SparkContext會建立一個RDDrest
new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
而後就over了…code
這實際上是spark的調度機制,只有遇到action操做的時候纔會真正提交一個job,這裏就不詳述了。跳過這一段,直接看NewHadoopRDD中的方法,最關鍵的兩個方法,compute()和getPartitions(),是和InputFormat的兩個方法一一對應的。orm
·getPartitions()
override def getPartitions: Array[Partition] = { //實例化InputFormat對象 也就是咱們傳入的TableInputFormat(多是其它InputFormat,這裏只是舉個例子) val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => configurable.setConf(_conf) case _ => } val jobContext = new JobContextImpl(_conf, jobId) //拿到全部split val rawSplits = inputFormat.getSplits(jobContext).toArray //拿到總分區數,並轉換爲spark的套路 val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { //把每一個split封裝成partition result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } result }
·compute()
因爲代碼太多會引發不適,貼一點關鍵代碼
//同樣的,實例化InputFormat對象 private val format = inputFormatClass.newInstance format match { case configurable: Configurable => configurable.setConf(conf) case _ => } //知足mapreduce的一切要求... private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) private var finished = false private var reader = try { //拿到關鍵的RecordReader val _reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) _reader } catch { case e: IOException if ignoreCorruptFiles => logWarning( s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", e) finished = true null } //喜聞樂見的hasNext和next override def hasNext: Boolean = { if (!finished && !havePair) { try { finished = !reader.nextKeyValue } catch { case e: IOException if ignoreCorruptFiles => logWarning( s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", e) finished = true } if (finished) { // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the // resources early. close() } havePair = !finished } !finished } override def next(): (K, V) = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false if (!finished) { inputMetrics.incRecordsRead(1) } if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { updateBytesRead() } (reader.getCurrentKey, reader.getCurrentValue) }
省略了無數代碼,大概就是把RecordReader封裝成Iterator(這坑爹的mapreduce不能直接拿Iterator做接口嗎)
Spark作的大概就是這樣事情,剩下的是hbase作的
Hbase篇
TableInputFormat是hbase提供的接口,用來兼容mapreduce,沒想到被spark這個濃眉大眼的截去了。
直奔主題找TableInputFormat的關鍵代碼
·getSplits()
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin()); TableName tableName = getTable().getName(); Pair<byte[][], byte[][]> keys = getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { HRegionLocation regLoc = getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); if (null == regLoc) { throw new IOException("Expecting at least one region."); } List<InputSplit> splits = new ArrayList<>(1); //拿到region的數量,用來作爲partitin的數量 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); //建立TableSplit,也就是InputSplit TableSplit split = new TableSplit(tableName, scan, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); splits.add(split);
·createRecordReader()
final TableRecordReader trr = this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); Scan sc = new Scan(this.scan); sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); trr.setTable(getTable()); return new RecordReader<ImmutableBytesWritable, Result>() { @Override public void close() throws IOException { trr.close(); closeTable(); } @Override public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { return trr.getCurrentKey(); } @Override public Result getCurrentValue() throws IOException, InterruptedException { return trr.getCurrentValue(); } @Override public float getProgress() throws IOException, InterruptedException { return trr.getProgress(); } @Overrid public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException { trr.initialize(inputsplit, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return trr.nextKeyValue(); } };
這個應該挺清楚的,花式建立RecordReader..
Spark爲了兼容mapreduce,給出了相似hadoopRDD()的接口,hbase爲了兼容mapreduce,給出了TableInputFormat之類的接口。從而使得spark能夠經過hbase獲取數據,固然方法不僅這一種。