以前有童鞋問到了這樣一個問題:爲何我在 reduce 階段遍歷了一次 Iterable 以後,再次遍歷的時候,數據都沒了呢?可能有童鞋想固然的回答:Iterable 只能單向遍歷一次,就這樣簡單的緣由。。。事實果然如此嗎? java
仍是用代碼說話: linux
package com.test; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class T { public static void main(String[] args) { // 只要實現了Iterable接口的對象均可以使用for-each循環。 // Iterable接口只由iterator方法構成, // iterator()方法是java.lang.Iterable接口,被Collection繼承。 /*public interface Iterable<T> { Iterator<T> iterator(); }*/ Iterable<String> iter = new Iterable<String>() { public Iterator<String> iterator() { List<String> l = new ArrayList<String>(); l.add("aa"); l.add("bb"); l.add("cc"); return l.iterator(); } }; for(int count : new int[] {1, 2}){ for (String item : iter) { System.out.println(item); } System.out.println("---------->> " + count + " END."); } } }結果固然是很正常的完整無誤的打印了兩遍 Iterable 的值。那到底是什麼緣由致使了 reduce 階段的 Iterable 只能被遍歷一次呢?
咱們先看一段測試代碼: 面試
測試數據: apache
a 3 a 4 b 50 b 60 a 70 b 8 a 9
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class TestIterable { public static class M1 extends Mapper<Object, Text, Text, Text> { private Text oKey = new Text(); private Text oVal = new Text(); String[] lineArr; public void map(Object key, Text value, Context context) throws IOException, InterruptedException { lineArr = value.toString().split(" "); oKey.set(lineArr[0]); oVal.set(lineArr[1]); context.write(oKey, oVal); } } public static class R1 extends Reducer<Text, Text, Text, Text> { List<String> valList = new ArrayList<String>(); List<Text> textList = new ArrayList<Text>(); String strAdd; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { valList.clear(); textList.clear(); strAdd = ""; for (Text val : values) { valList.add(val.toString()); textList.add(val); } // 坑之 1 :爲神馬輸出的全是最後一個值?why? for(Text text : textList){ strAdd += text.toString() + ", "; } System.out.println(key.toString() + "\t" + strAdd); System.out.println("......................."); // 我這樣幹呢?對了嗎? strAdd = ""; for(String val : valList){ strAdd += val + ", "; } System.out.println(key.toString() + "\t" + strAdd); System.out.println("----------------------"); // 坑之 2 :第二次遍歷的時候爲何獲得的都是空?why? valList.clear(); strAdd = ""; for (Text val : values) { valList.add(val.toString()); } for(String val : valList){ strAdd += val + ", "; } System.out.println(key.toString() + "\t" + strAdd); System.out.println(">>>>>>>>>>>>>>>>>>>>>>"); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapred.job.queue.name", "regular"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } System.out.println("------------------------"); Job job = new Job(conf, "TestIterable"); job.setJarByClass(TestIterable.class); job.setMapperClass(M1.class); job.setReducerClass(R1.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileSystem.get(conf).delete(new Path(otherArgs[1]), true); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }在 Eclipse 控制檯中的結果以下:
a 9, 9, 9, 9, ....................... a 3, 4, 70, 9, ---------------------- a >>>>>>>>>>>>>>>>>>>>>> b 8, 8, 8, ....................... b 50, 60, 8, ---------------------- b >>>>>>>>>>>>>>>>>>>>>>關於第 1 個坑:對象重用( objects reuse )
reduce方法的javadoc中已經說明了會出現的問題: windows
The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of. 性能優化
也就是說雖然reduce方法會反覆執行屢次,但key和value相關的對象只有兩個,reduce會反覆重用這兩個對象。因此若是要保存key或者value的結果,只能將其中的值取出另存或者從新clone一個對象(例如Text store = new Text(value) 或者 String a = value.toString()),而不能直接賦引用。由於引用從始至終都是指向同一個對象,你若是直接保存它們,那最後它們都指向最後一個輸入記錄。會影響最終計算結果而出錯。 app
看到這裏,我想你會恍然大悟:這不是剛畢業找工做,面試官常問的問題:String 是不可變對象但爲何能相加呢?爲何字符串相加不提倡用 String,而用 StringBuilder ?若是你還不清楚這個問題怎麼回答,建議你看看這篇《深刻理解 String, StringBuffer 與 StringBuilder 的區別》http://my.oschina.net/leejun2005/blog/102377 框架
關於第 2 個坑:http://stackoverflow.com/questions/6111248/iterate-twice-on-values eclipse
The Iterator you receive from that Iterable's iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren't really backed by a Collection, so it's nontrivial to allow multiple iterations. oop
最後想說明的是:hadoop 框架的做者們真的是考慮很周全,在 hadoop 框架中,不只有對象重用,還有 JVM 重用等,節約一切能夠節約的資源,提升一切能夠提升的性能。由於在這種海量數據處理的場景下,性能優化是很是重要的,你可能處理100條數據體現不出性能差異,可是你面對的是千億、萬億級別的數據呢?
PS:
個人代碼是在 Eclipse 中遠程調試的,因此 reduce 是沒有寫 hdfs 的,直接在 eclipse 終端上能夠看到結果,很方便,關於怎麼在 windows 上遠程調試 hadoop,請參考這裏 《實戰 windows7 下 eclipse 遠程調試 linux hadoop》http://my.oschina.net/leejun2005/blog/122775
REF:
hadoop中迭代器的對象重用問題
http://paddy-w.iteye.com/blog/1514595
關於 hadoop 中 JVM 重用和對象重用的介紹