<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=37442139&cid=66303785&page=28" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>html
a b k l j c a n m o
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf() val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop) // Sets the thread local variable for the file's name split.inputSplit.value match { case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString) case _ => SqlNewHadoopRDDState.unsetInputFileName() } // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { split.inputSplit.value match { case _: FileSplit | _: CombineFileSplit => SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() case _ => None } } inputMetrics.setBytesReadCallback(bytesReadCallback) var reader: RecordReader[K, V] = null //返回TextInputFormat對象 val inputFormat = getInputFormat(jobConf) HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), context.stageId, theSplit.index, context.attemptNumber, jobConf) //實例化對象 org.apache.hadoop.mapred.LineRecordReader //new LineRecordReader()實例方法中, 而且會從新計算當前partition的開始位置(與預分區的會有出入) reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener{ context => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue() override def getNext(): (K, V) = { try { //調用 org.apache.hadoop.mapred.LineRecordReader.next()方法 finished = !reader.next(key, value) } catch { case _: EOFException if ignoreCorruptFiles => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) } //返回當前一對(key,value)對應的值 (key, value) } override def close() { if (reader != null) { SqlNewHadoopRDDState.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic // corruption issues when reading compressed input. try { reader.close() } catch { case e: Exception => if (!ShutdownHookManager.inShutdown()) { logWarning("Exception in RecordReader.close()", e) } } finally { reader = null } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.inputSplit.value.isInstanceOf[FileSplit] || split.inputSplit.value.isInstanceOf[CombineFileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { inputMetrics.incBytesRead(split.inputSplit.value.getLength) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) } } } } } new InterruptibleIterator[(K, V)](context, iter) }
public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); String delimiter = job.get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LineRecordReader(job, (FileSplit) genericSplit, recordDelimiterBytes); }
public LineRecordReader(Configuration job, FileSplit split, byte[] recordDelimiter) throws IOException { this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); if (isCompressedInput()) { decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new CompressedSplitLineReader(cIn, job, recordDelimiter); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter); filePosition = fileIn; } } else { fileIn.seek(start); //讀取文件,定位的文件偏移量爲,當前partition預分區的開始位置 in = new UncompressedSplitLineReader( fileIn, job, recordDelimiter, split.getLength()); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { //調用 in.readLine()方法,等於調用 UncompressedSplitLineReader.readLine(), //注意此時傳的maxLineLength參數爲0 //定位當前分區的開始位置,等於預分區的位置 + 讀到的第一個換行符的長度 //也就是從當前partition開始位置計算,到讀到的第一次換行符,屬於上一個partition,在向後位置偏移位置+1,就是當前分區的實時開始位置 start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; }
override def getNext(): (K, V) = { try { finished = !reader.next(key, value) } catch { case _: EOFException if ignoreCorruptFiles => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) } (key, value) }
/** Read a line. */ public synchronized boolean next(LongWritable key, Text value) throws IOException { // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) // getFilePosition() 等於 pos位置 while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos);//調置本次的偏移位置 int newSize = 0; if (pos == 0) { //第一個partition(0) newSize = skipUtfByteOrderMark(value); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if (newSize == 0) { return false; } if (newSize < maxLineLength) { return true; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } return false; }
@Override public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { int bytesRead = 0; if (!finished) { // only allow at most one more record to be read after the stream // reports the split ended if (totalBytesRead > splitLength) { finished = true; } bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); } return bytesRead; }
/** * Read one line from the InputStream into the given Text. * * @param str the object to store the given line (without newline) * @param maxLineLength the maximum number of bytes to store into str; * the rest of the line is silently discarded. * @param maxBytesToConsume the maximum number of bytes to consume * in this call. This is only a hint, because if the line cross * this threshold, we allow it to happen. It can overshoot * potentially by as much as one buffer length. * * @return the number of bytes read including the (longest) newline * found. * * @throws IOException if the underlying stream throws */ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { if (this.recordDelimiterBytes != null) { return readCustomLine(str, maxLineLength, maxBytesToConsume); } else { return readDefaultLine(str, maxLineLength, maxBytesToConsume); } }
/** * Read a line terminated by one of CR, LF, or CRLF. * 當maxLineLength=0時,也就是partition不爲0時,定位開始位置的時候,該方法會讀取到 */ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy * everything and read another buffer from the stream. * 2. An unambiguously terminated line is in buffer, so we just * copy to str. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends * in CR. In this case we copy everything up to CR to str, but * we also need to see what follows CR: if it's LF, then we * need consume LF as well, so next call to readLine will read * from after that. * We use a flag prevCharCR to signal if previous character was CR * and, if it happens to be at the end of the buffer, delay * consuming it until we have a chance to look at the char that * follows. */ str.clear(); int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { int startPosn = bufferPosn; //starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; if (prevCharCR) { //bytesConsumed:總計讀取的數據長度(包括換行符) ++bytesConsumed; //account for CR from previous read } /** * 實際讀取HDFS文件的方法 * buffer:緩衝區 * bufferLength : 這一次讀到的數據長度 */ bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0) { break; // EOF } } //對讀到的buffer數組數據進行遍歷,找找第一個換行符 // bufferPosn: 讀到換行符時的位置(索引),同一個分區中這個值是會保存的 for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline if (buffer[bufferPosn] == LF) { //調試時prevCharCR = false, 當找到換行符\n時,newlineLength=1 newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } if (prevCharCR) { //CR + notLF, we are at notLF newlineLength = 1; break; } //在linux平臺測試數據中沒看到等於\r的,也就是調試prevCharCR一直等於false prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn;//這一次讀取的數據長度(包括換行符) if (prevCharCR && newlineLength == 0) { --readLength; //CR at the end of the buffer } //總計讀取的數據長度(包括換行符) bytesConsumed += readLength; //這一次讀取的數據長度(不包括換行符) int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { //若是讀到的數據長度,大於最大長度限制,作個控制 //若是maxLineLength=0, txtLength =0 時,此時是不須要讀數據的,就給appendLength賦值爲0 appendLength = maxLineLength - txtLength; } if (appendLength > 0) { //若是計算appendLength >0 時,把值賦值給str,也就是咱們讀到的值 str.append(buffer, startPosn, appendLength); //txtLength變量累加每次實際讀到的長度(不包括換行符) txtLength += appendLength; } //循環條件,是沒有讀到換行符,而且 } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int)bytesConsumed; }
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) throws IOException { int maxBytesToRead = buffer.length; //緩衝的大小,默認爲64KB //splitLength 當前partition的預分區大小(長度) // totalBytesRead 當前partitition總共讀取了的數據長度 if (totalBytesRead < splitLength) { //說明當前partition預分區長度尚未讀完,還須要繼續讀取剩下的長度 long leftBytesForSplit = splitLength - totalBytesRead; // check if leftBytesForSplit exceed Integer.MAX_VALUE if (leftBytesForSplit <= Integer.MAX_VALUE) { //作個比較,當前分區剩餘的長度小於等於Integer.MAX_VALUE),取64KB默認長度和實際長度的一個小的值 maxBytesToRead = Math.min(maxBytesToRead, (int)leftBytesForSplit); } } //實際讀取的數據長度 int bytesRead = in.read(buffer, 0, maxBytesToRead); // If the split ended in the middle of a record delimiter then we need // to read one additional record, as the consumer of the next split will // not recognize the partial delimiter as a record. // However if using the default delimiter and the next character is a // linefeed then next split will treat it as a delimiter all by itself // and the additional record read should not be performed. if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) { if (usingCRLF) { needAdditionalRecord = (buffer[0] != '\n'); } else { needAdditionalRecord = true; } } if (bytesRead > 0) { //讀到了數據,當前partitition讀到的總數據長度作個累加 totalBytesRead += bytesRead; } return bytesRead; }