上篇我剛剛學習完。Spilt的過程,還算比較簡單的了,接下來學習的就是Map操做的過程了,Map和Reduce同樣。是整個MapReduce的重要內容,因此。這一篇,我會好好的講講裏面的內部實現過程。首先要說,MapTask。分爲4種,可能這一點上有人就可能知道了,各自是Job-setup Task,Job-cleanup Task。Task-cleanup和Map Task。前面3個都是輔助性質的任務。不是本文分析的重點,我講的就是裏面的最最重要的MapTask。java
MapTask的整個過程分爲5個階段:app
Read----->Map------>Collect------->Spill------>Combinejvm
來張時序圖。簡單明瞭:ide
在後面的代碼分析中。你會看到各自方法的調用過程。函數
在分析整個過程以前。得先了解裏面的一些內部結構,MapTask類做爲Map Task的一個載體。他的類關係例如如下:post
咱們調用的就是裏面的run方法,開啓map任務,對應的代碼:學習
/** * mapTask主要運行流程 */ @Override public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; // start thread that will handle communication with parent //發送task任務報告。與父進程作交流 TaskReporter reporter = new TaskReporter(getProgress(), umbilical, jvmContext); reporter.startCommunicationThread(); //推斷用的是新的MapReduceAPI仍是舊的API boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask //map任務有4種。Job-setup Task, Job-cleanup Task, Task-cleanup Task和MapTask if (jobCleanup) { //這裏運行的是Job-cleanup Task runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { //這裏運行的是Job-setup Task runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { //這裏運行的是Task-cleanup Task runTaskCleanupTask(umbilical, reporter); return; } //假設前面3個任務都不是,運行的就是最基本的MapTask,依據新老API調用不一樣的方法 if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { //咱們關注一下老的方法實現splitMetaInfo爲Spilt分片的信息。由於上步驟的InputFormat過程傳入的 runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }在這裏我研究的都是舊的API因此往runOldMapper裏面跳。
在這裏我要插入一句,後面的運行都會環繞着一個叫Mapper的東西,就是用戶運行map函數的一個代理稱呼同樣,他可以全然本身重寫map的背後的過程,也可以用系統自帶的mapp流程。ui
系統已經給了MapRunner的詳細實現:this
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, Reporter reporter) throws IOException { try { // allocate key & value instances that are re-used for all entries K1 key = input.createKey(); V1 value = input.createValue(); //從RecordReader中獲取每個鍵值對,調用用戶寫的map函數 while (input.next(key, value)) { // map pair to output //調用用戶寫的map函數 mapper.map(key, value, output, reporter); if(incrProcCount) { reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); } } } finally { //結束了關閉mapper mapper.close(); } }從這裏咱們可以看出Map的過程就是迭代式的反覆的運行用戶定義的Map函數操做。好了,有了這些前提,咱們可以往裏深刻的學習了剛剛說到了runOldMapper方法,裏面當即要進行的就是Map Task的第一個過程Read。
Read階段的做業就是從RecordReader中讀取出一個個key-value,準備給後面的map過程運行map函數操做。spa
//獲取輸入inputSplit信息 InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); updateJobWithSplit(job, inputSplit); reporter.setInputSplit(inputSplit); //是不是跳過錯誤記錄模式,獲取RecordReader RecordReader<INKEY,INVALUE> in = isSkipping() ? new SkippingRecordReader<INKEY,INVALUE>(inputSplit, umbilical, reporter) : new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, reporter);後面的就是Map階段。把值取出來以後。就要給Mapper去運行裏面的run方法了,run方法裏面會調用用戶本身實現的map函數。以前也都是分析過了的。
在用戶編寫的map的尾部,一般會調用collect.collect()方法,把處理後的key-value輸出,這個時候,也就來到了collect階段。
runner.run(in, new OldOutputCollector(collector, conf), reporter);以後進行的是Collect階段基本的操做時什麼呢,就是把一堆堆的key-value進行分區輸出到環形緩衝區中。這是的數據只放在內存中。尚未寫到磁盤中。在collect這個過程當中涉及的東西還比較多,看一下結構關係圖;
裏面有個partitioner的成員變量,專門用於獲取key-value的的分區號。默認是經過key的哈希取模運算。獲得分區號的,固然你可以本身定義實現,假設不分區的話partition就是等於-1。
/** * Since the mapred and mapreduce Partitioners don't share a common interface * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs, * the configured partitioner should not be called. It's common for * partitioners to compute a result mod numReduces, which causes a div0 error */ private static class OldOutputCollector<K,V> implements OutputCollector<K,V> { private final Partitioner<K,V> partitioner; private final MapOutputCollector<K,V> collector; private final int numPartitions; @SuppressWarnings("unchecked") OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) { numPartitions = conf.getNumReduceTasks(); if (numPartitions > 0) { //假設分區數大於0,則反射獲取系統配置方法,默認哈希去模。用戶可以本身實現字節的分區方法 //因爲是RPC傳來的,因此採用反射 partitioner = (Partitioner<K,V>) ReflectionUtils.newInstance(conf.getPartitionerClass(), conf); } else { //假設分區數爲0。說明不進行分區 partitioner = new Partitioner<K,V>() { @Override public void configure(JobConf job) { } @Override public int getPartition(K key, V value, int numPartitions) { //分區號直接返回-1表明不分區處理 return -1; } }; } this.collector = collector; } .....collect的代理調用實現方法例如如下,注意此時還不是真正調用:
..... @Override public void collect(K key, V value) throws IOException { try { //詳細經過collect方法分區寫入內存。調用partitioner.getPartition獲取分區號 //緩衝區爲環形緩衝區 collector.collect(key, value, partitioner.getPartition(key, value, numPartitions)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("interrupt exception", ie); } }這裏的collector指的是上面代碼中的MapOutputCollector對象。開放給用調用的是OldOutputCollector,但是咱們看看代碼:
interface MapOutputCollector<K, V> { public void collect(K key, V value, int partition ) throws IOException, InterruptedException; public void close() throws IOException, InterruptedException; public void flush() throws IOException, InterruptedException, ClassNotFoundException; }
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException, ClassNotFoundException { ... int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); MapOutputCollector collector = null; if (numReduceTasks > 0) { //假設存在ReduceTask,則將數據存入MapOutputBuffer環形緩衝 collector = new MapOutputBuffer(umbilical, job, reporter); } else { //假設沒有ReduceTask任務的存在,直接寫入把操做結果寫入HDFS做爲終於結果 collector = new DirectMapOutputCollector(umbilical, job, reporter); } MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { runner.run(in, new OldOutputCollector(collector, conf), reporter); .....分爲2種狀況當有Reduce任務時。collector爲MapOutputBuffer,沒有Reduce任務時爲DirectMapOutputCollector。從這裏也能明確。做者考慮的很是周全呢,沒有Reduce直接寫入HDFS,效率會高很是多。
也就是說。終於的collect方法就是MapOutputBuffer的方法了。
因爲collect的操做時將數據存入環形緩衝區,這意味着。用戶對數據的讀寫都是在同個緩衝區上的,因此爲了不出現髒數據的現象,必定會作額外處理。這裏做者用了和BlockingQueue相似的操做,用一個ReetrantLocj,獲取2個鎖控制條件,一個爲spillDone
,一個爲spillReady。同個condition的await,signal方法實現丟緩衝區的讀寫控制。
..... private final ReentrantLock spillLock = new ReentrantLock(); private final Condition spillDone = spillLock.newCondition(); private final Condition spillReady = spillLock.newCondition(); .....而後看collect的方法:
public synchronized void collect(K key, V value, int partition ) throws IOException { ..... try { // serialize key bytes into buffer int keystart = bufindex; keySerializer.serialize(key); if (bufindex < keystart) { // wrapped the key; reset required bb.reset(); keystart = 0; } // serialize value bytes into buffer final int valstart = bufindex; valSerializer.serialize(value); int valend = bb.markRecord(); if (partition < 0 || partition >= partitions) { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); } ....
Spill的階段會時不時的穿插在collect的運行過程當中。
... if (kvstart == kvend && kvsoftlimit) { LOG.info("Spilling map output: record full = " + kvsoftlimit); startSpill(); }假設開頭kvstart的位置等kvend的位置,說明轉了一圈有到頭了。數據已經滿了的狀態,開始spill溢寫操做。
private synchronized void startSpill() { LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex + "; length = " + kvoffsets.length); kvend = kvindex; bufend = bufmark; spillReady.signal(); }會觸發condition的信號量操做:
private synchronized void startSpill() { LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex + "; length = " + kvoffsets.length); kvend = kvindex; bufend = bufmark; spillReady.signal(); }就會跑到了SpillThead這個地方運行sortAndSpill方法:
spillThreadRunning = true; try { while (true) { spillDone.signal(); while (kvstart == kvend) { spillReady.await(); } try { spillLock.unlock(); //當緩衝區溢出時,寫到磁盤中 sortAndSpill();sortAndSpill裏面會對數據作寫入文件操做寫入以前還會有sort排序操做。數據多了還會進行必定的combine合併操做。
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { ...... try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; //在寫入操做前進行排序操做 sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; IndexRecord rec = new IndexRecord(); InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); //writer中寫入鍵值對操做 writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition //假設分區多的話,運行合併操做 if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); //運行一次文件合併combine操做 combinerRunner.combine(kvIter, combineCollector); } } ...... //寫入到文件裏 spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }每次Spill的過程都會產生一堆堆的文件,在最後的時候就會來到了Combine階段。也就是Map任務的最後一個階段了,他的任務就是把所有上一階段的任務產生的文件進行Merge操做,合併成一個文件,便於後面的Reduce的任務的讀取,在代碼的相應實現中是collect.flush()方法。
..... try { runner.run(in, new OldOutputCollector(collector, conf), reporter); //將collector中的數據刷新到內存中去 collector.flush(); } finally { //close in.close(); // close input collector.close(); } }這裏的collector的flush方法調用的就是MapOutputBuffer.flush方法,
public synchronized void flush() throws IOException, ClassNotFoundException, InterruptedException { ... // shut down spill thread and wait for it to exit. Since the preceding // ensures that it is finished with its work (and sortAndSpill did not // throw), we elect to use an interrupt instead of setting a flag. // Spilling simultaneously from this thread while the spill thread // finishes its work might be both a useful way to extend this and also // sufficient motivation for the latter approach. try { spillThread.interrupt(); spillThread.join(); } catch (InterruptedException e) { throw (IOException)new IOException("Spill failed" ).initCause(e); } // release sort buffer before the merge kvbuffer = null; //最後進行merge合併成一個文件 mergeParts(); Path outputPath = mapOutputFile.getOutputFile(); fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen()); }至此,Map任務宣告結束了。整體流程仍是真是有點九曲十八彎的感受。
分析這麼一個比較龐雜的過程,我一直在想怎樣更好的表達出個人想法。歡迎MapReduce的學習者,提出意見,共同窗習