Hadoop爲每一個做業維護若干內置計數器, 以描述該做業的各項指標。例如,某些計數器記錄已處理的字節數和記錄數,使用戶可監控已處理的輸入數據量和已產生的輸出數據量,並以此對 job 作適當的優化。 html
14/06/08 15:13:35 INFO mapreduce.Job: Counters: 46 File System Counters FILE: Number of bytes read=159 FILE: Number of bytes written=159447 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=198 HDFS: Number of bytes written=35 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Rack-local map tasks=1 Total time spent by all maps in occupied slots (ms)=3896 Total time spent by all reduces in occupied slots (ms)=9006 Map-Reduce Framework Map input records=3 Map output records=12 Map output bytes=129 Map output materialized bytes=159 Input split bytes=117 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=159 Reduce input records=12 Reduce output records=4 Spilled Records=24 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=13 CPU time spent (ms)=3830 Physical memory (bytes) snapshot=537718784 Virtual memory (bytes) snapshot=7365263360 Total committed heap usage (bytes)=2022309888 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=81 File Output Format Counters Bytes Written=35計數器由其關聯任務維護,並按期傳到tasktracker,再由tasktracker傳給 jobtracker.所以,計數器可以被全局地彙集。詳見第 hadoop 權威指南第170頁的「進度和狀態的更新」小節。與其餘計數器(包括用戶定義的計數器)不一樣,內置的做業計數器實際上 由jobtracker維護,沒必要在整個網絡中發送。
Notice1:須要說明的是,不一樣的 hadoop 版本定義的方式會有些許差別。 java
(1)在0.20.x版本中使用counter很簡單,直接定義便可,如無此counter,hadoop會自動添加此counter.
shell
Counter ct = context.getCounter("INPUT_WORDS", "count"); ct.increment(1);(2)在0.19.x版本中,須要定義enum
enum MyCounter {INPUT_WORDS }; reporter.incrCounter(MyCounter.INPUT_WORDS, 1); RunningJob job = JobClient.runJob(conf); Counters c = job.getCounters(); long cnt = c.getCounter(MyCounter.INPUT_WORDS);
Notice2:使用計數器須要清楚的是它們都存儲在jobTracker的內存裏。Mapper/Reducer 任務序列化它們,連同更新狀態被髮送。爲了運行正常且jobTracker不會出問題,計數器的數量應該在10-100個,計數器不只僅只用來聚合MapReduce job的統計值。新版本的hadoop限制了計數器的數量,以防給jobTracker帶來損害。你最不想看到的事情就是因爲定義上百個計數器而使jobTracker宕機。
下面我們來看一個計數器的實例(如下代碼請運行在 0.20.1 版本以上):
apache
hello world 2013 mapreduce hello world 2013 mapreduce hello world 2013 mapreduce
/** * Project Name:CDHJobs * File Name:MapredCounter.java * Package Name:tmp * Date:2014-6-8下午2:12:48 * Copyright (c) 2014, decli#qq.com All Rights Reserved. * */ package tmp; import java.io.IOException; import java.util.StringTokenizer; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountWithCounter { static enum WordsNature { STARTS_WITH_DIGIT, STARTS_WITH_LETTER, ALL } /** * The map class of WordCount. */ public static class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } /** * The reducer class of WordCount */ public static class TokenCounterReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; String token = key.toString(); if (StringUtils.isNumeric(token)) { context.getCounter(WordsNature.STARTS_WITH_DIGIT).increment(1); } else if (StringUtils.isAlpha(token)) { context.getCounter(WordsNature.STARTS_WITH_LETTER).increment(1); } context.getCounter(WordsNature.ALL).increment(1); for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } /** * The main entry point. */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "WordCountWithCounter"); job.setJarByClass(WordCountWithCounter.class); job.setMapperClass(TokenCounterMapper.class); job.setReducerClass(TokenCounterReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/tmp/dsap/rawdata/june/a.txt")); FileOutputFormat.setOutputPath(job, new Path("/tmp/dsap/rawdata/june/a_result")); int exitCode = job.waitForCompletion(true) ? 0 : 1; Counters counters = job.getCounters(); Counter c1 = counters.findCounter(WordsNature.STARTS_WITH_DIGIT); System.out.println("-------------->>>>: " + c1.getDisplayName() + ": " + c1.getValue()); // The below example shows how to get built-in counter groups that Hadoop provides basically. for (CounterGroup group : counters) { System.out.println("=========================================================="); System.out.println("* Counter Group: " + group.getDisplayName() + " (" + group.getName() + ")"); System.out.println(" number of counters in this group: " + group.size()); for (Counter counter : group) { System.out.println(" ++++ " + counter.getDisplayName() + ": " + counter.getName() + ": " + counter.getValue()); } } System.exit(exitCode); } }
運行結果下面會一併給出。Counter有"組group"的概念,用於表示邏輯上相同範圍的全部數值。MapReduce job提供的默認Counter分爲7個組,下面逐一介紹。這裏也拿上面的測試數據來作詳細比對,我將會針對具體的計數器,挑選一些主要的簡述一下。 網絡
... 前面省略 job 運行信息 xx 字 ... ALL=4 STARTS_WITH_DIGIT=1 STARTS_WITH_LETTER=3 -------------->>>>: STARTS_WITH_DIGIT: 1 ========================================================== #MapReduce job執行所依賴的數據來自於不一樣的文件系統,這個group表示job與文件系統交互的讀寫統計 * Counter Group: File System Counters (org.apache.hadoop.mapreduce.FileSystemCounter) number of counters in this group: 10 #job讀取本地文件系統的文件字節數。假定咱們當前map的輸入數據都來自於HDFS,那麼在map階段,這個數據應該是0。但reduce在執行前,它 的輸入數據是通過shuffle的merge後存儲在reduce端本地磁盤中,因此這個數據就是全部reduce的總輸入字節數。 ++++ FILE: Number of bytes read: FILE_BYTES_READ: 159 #map的中間結果都會spill到本地磁盤中,在map執行完後,造成最終的spill文件。因此map端這裏的數據就表示map task往本地磁盤中總共寫了多少字節。與map端相對應的是,reduce端在shuffle時,會不斷地拉取map端的中間結果,而後作merge並 不斷spill到本身的本地磁盤中。最終造成一個單獨文件,這個文件就是reduce的輸入文件。 ++++ FILE: Number of bytes written: FILE_BYTES_WRITTEN: 159447 ++++ FILE: Number of read operations: FILE_READ_OPS: 0 ++++ FILE: Number of large read operations: FILE_LARGE_READ_OPS: 0 ++++ FILE: Number of write operations: FILE_WRITE_OPS: 0 # 整個job執行過程當中,只有map端運行時,才從HDFS讀取數據,這些數據不限於源文件內容,還包括全部map的split元數據。因此這個值應該比FileInputFormatCounters.BYTES_READ 要略大些。 ++++ HDFS: Number of bytes read: HDFS_BYTES_READ: 198 #Reduce的最終結果都會寫入HDFS,就是一個job執行結果的總量。 ++++ HDFS: Number of bytes written: HDFS_BYTES_WRITTEN: 35 ++++ HDFS: Number of read operations: HDFS_READ_OPS: 6 ++++ HDFS: Number of large read operations: HDFS_LARGE_READ_OPS: 0 ++++ HDFS: Number of write operations: HDFS_WRITE_OPS: 2 ========================================================== #這個group描述與job調度相關的統計 * Counter Group: Job Counters (org.apache.hadoop.mapreduce.JobCounter) number of counters in this group: 5 #Job在被調度時,若是啓動了一個data-local(源文件的幅本在執行map task的taskTracker本地) ++++ Data-local map tasks #當前job爲某些map task的執行保留了slot,總共保留的時間是多少 ++++ FALLOW_SLOTS_MILLIS_MAPS/REDUCES #全部map task佔用slot的總時間,包含執行時間和建立/銷燬子JVM的時間 ++++ SLOTS_MILLIS_MAPS/REDUCES # 此job啓動了多少個map task ++++ Launched map tasks: TOTAL_LAUNCHED_MAPS: 1 # 此job啓動了多少個reduce task ++++ Launched reduce tasks: TOTAL_LAUNCHED_REDUCES: 1 ++++ Rack-local map tasks: RACK_LOCAL_MAPS: 1 ++++ Total time spent by all maps in occupied slots (ms): SLOTS_MILLIS_MAPS: 3896 ++++ Total time spent by all reduces in occupied slots (ms): SLOTS_MILLIS_REDUCES: 9006 ========================================================== #這個Counter group包含了至關多地job執行細節數據。這裏須要有個概念認識是:通常狀況下,record就表示一行數據,而相對地byte表示這行數據的大小是 多少,這裏的group表示通過reduce merge後像這樣的輸入形式{"aaa", [5, 8, 2, …]}。 * Counter Group: Map-Reduce Framework (org.apache.hadoop.mapreduce.TaskCounter) number of counters in this group: 20 #全部map task從HDFS讀取的文件總行數 ++++ Map input records: MAP_INPUT_RECORDS: 3 #map task的直接輸出record是多少,就是在map方法中調用context.write的次數,也就是未通過Combine時的原生輸出條數 ++++ Map output records: MAP_OUTPUT_RECORDS: 12 # Map的輸出結果key/value都會被序列化到內存緩衝區中,因此這裏的bytes指序列化後的最終字節之和 ++++ Map output bytes: MAP_OUTPUT_BYTES: 129 ++++ Map output materialized bytes: MAP_OUTPUT_MATERIALIZED_BYTES: 159 # #與map task 的split相關的數據都會保存於HDFS中,而在保存時元數據也相應地存儲着數據是以怎樣的壓縮方式放入的,它的具體類型是什麼,這些額外的數據是 MapReduce框架加入的,與job無關,這裏記錄的大小就是表示額外信息的字節大小 ++++ Input split bytes: SPLIT_RAW_BYTES: 117 #Combiner是爲了減小盡可能減小須要拉取和移動的數據,因此combine輸入條數與map的輸出條數是一致的。 ++++ Combine input records: COMBINE_INPUT_RECORDS: 0 # 通過Combiner後,相同key的數據通過壓縮,在map端本身解決了不少重複數據,表示最終在map端中間文件中的全部條目數 ++++ Combine output records: COMBINE_OUTPUT_RECORDS: 0 #Reduce總共讀取了多少個這樣的groups ++++ Reduce input groups: REDUCE_INPUT_GROUPS: 4 #Reduce端的copy線程總共從map端抓取了多少的中間數據,表示各個map task最終的中間文件總和 ++++ Reduce shuffle bytes: REDUCE_SHUFFLE_BYTES: 159 #若是有Combiner的話,那麼這裏的數值就等於map端Combiner運算後的最後條數,若是沒有,那麼就應該等於map的輸出條數 ++++ Reduce input records: REDUCE_INPUT_RECORDS: 12 #全部reduce執行後輸出的總條目數 ++++ Reduce output records: REDUCE_OUTPUT_RECORDS: 4 #spill過程在map和reduce端都會發生,這裏統計在總共從內存往磁盤中spill了多少條數據 ++++ Spilled Records: SPILLED_RECORDS: 24 #每一個reduce幾乎都得從全部map端拉取數據,每一個copy線程拉取成功一個map的數據,那麼增1,因此它的總數基本等於 reduce number * map number ++++ Shuffled Maps : SHUFFLED_MAPS: 1 # copy線程在抓取map端中間數據時,若是由於網絡鏈接異常或是IO異常,所引發的shuffle錯誤次數 ++++ Failed Shuffles: FAILED_SHUFFLE: 0 #記錄着shuffle過程當中總共經歷了多少次merge動做 ++++ Merged Map outputs: MERGED_MAP_OUTPUTS: 1 #經過JMX獲取到執行map與reduce的子JVM總共的GC時間消耗 ++++ GC time elapsed (ms): GC_TIME_MILLIS: 13 ++++ CPU time spent (ms): CPU_MILLISECONDS: 3830 ++++ Physical memory (bytes) snapshot: PHYSICAL_MEMORY_BYTES: 537718784 ++++ Virtual memory (bytes) snapshot: VIRTUAL_MEMORY_BYTES: 7365263360 ++++ Total committed heap usage (bytes): COMMITTED_HEAP_BYTES: 2022309888 ========================================================== #這組內描述Shuffle過程當中的各類錯誤狀況發生次數,基本定位於Shuffle階段copy線程抓取map端中間數據時的各類錯誤。 * Counter Group: Shuffle Errors (Shuffle Errors) number of counters in this group: 6 #每一個map都有一個ID,如attempt_201109020150_0254_m_000000_0,若是reduce的copy線程抓取過來的元數據中這個ID不是標準格式,那麼此Counter增長 ++++ BAD_ID: BAD_ID: 0 #表示copy線程創建到map端的鏈接有誤 ++++ CONNECTION: CONNECTION: 0 #Reduce的copy線程若是在抓取map端數據時出現IOException,那麼這個值相應增長 ++++ IO_ERROR: IO_ERROR: 0 #map端的那個中間結果是有壓縮好的有格式數據,全部它有兩個length信息:源數據大小與壓縮後數據大小。若是這兩個length信息傳輸的有誤(負值),那麼此Counter增長 ++++ WRONG_LENGTH: WRONG_LENGTH: 0 #每一個copy線程固然是有目的:爲某個reduce抓取某些map的中間結果,若是當前抓取的map數據不是copy線程以前定義好的map,那麼就表示把數據拉錯了 ++++ WRONG_MAP: WRONG_MAP: 0 #與上面描述一致,若是抓取的數據表示它不是爲此reduce而準備的,那仍是拉錯數據了。 ++++ WRONG_REDUCE: WRONG_REDUCE: 0 ========================================================== #這個group表示map task讀取文件內容(總輸入數據)的統計 * Counter Group: File Input Format Counters (org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter) number of counters in this group: 1 # Map task的全部輸入數據(字節),等於各個map task的map方法傳入的全部value值字節之和。 ++++ Bytes Read: BYTES_READ: 81 ========================================================== ##這個group表示reduce task輸出文件內容(總輸出數據)的統計 * Counter Group: File Output Format Counters (org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter) number of counters in this group: 1 ++++ Bytes Written: BYTES_WRITTEN: 35 ========================================================== # 自定義計數器的統計 * Counter Group: tmp.WordCountWithCounter$WordsNature (tmp.WordCountWithCounter$WordsNature) number of counters in this group: 3 ++++ ALL: ALL: 4 ++++ STARTS_WITH_DIGIT: STARTS_WITH_DIGIT: 1 ++++ STARTS_WITH_LETTER: STARTS_WITH_LETTER: 3
若是想要在 MapReduce 中實現一個相似計數器的「全局變量」,能夠在 map、reduce 中以任意數據類型、任意修改變量值,並在 main 函數中回調獲取該怎麼辦呢? app
(1)An Example of Hadoop MapReduce Counter 框架
http://diveintodata.org/2011/03/15/an-example-of-hadoop-mapreduce-counter/ 分佈式
(2)Hadoop Tutorial Series, Issue #3: Counters In Action ide
http://www.philippeadjiman.com/blog/2010/01/07/hadoop-tutorial-series-issue-3-counters-in-action/ 函數
(3)Controlling Hadoop MapReduce Job recursion
http://codingwiththomas.blogspot.com/2011/04/controlling-hadoop-job-recursion.html
(4)MapReduce Design Patterns(chapter 2 (part 3))(四)
http://blog.csdn.net/cuirong1986/article/details/8456923
(5)[hadoop源碼閱讀][5]-counter的使用和默認counter的含義
http://www.cnblogs.com/xuxm2007/archive/2012/06/15/2551030.html