mapreduce中counter的使用

    MapReduce Counter爲提供咱們一個窗口:觀察MapReduce job運行期的各類細節數據。MapReduce自帶了許多默認Counter。 html

    Counter有"組group"的概念,用於表示邏輯上相同範圍的全部數值。MapReduce job提供的默認Counter分爲三個組 java

  1. Map-Reduce Frameword
    Map input records,Map skipped records,Map input bytes,Map output records,Map output bytes,Combine input records,Combine output records,Reduce input records,Reduce input groups,Reduce output records,Reduce skipped groups,Reduce skipped records,Spilled records
  2. File Systems
    FileSystem bytes read,FileSystem bytes written
  3. Job Counters
    Launched map tasks,Launched reduce tasks,Failed map tasks,Failed reduce tasks,Data-local map tasks,Rack-local map tasks,Other local map tasks
        這些 counters你在Web UI中,或是job結束後在控制檯生成的統計報告中都看獲得。 見以下MR運行日誌:


-bash-4.1$ hadoop jar mr.jar com.catt.cdh.mr.CountRecords
13/11/29 11:38:04 WARN conf.Configuration: fs.default.name is deprecated. Instead, use fs.defaultFS
13/11/29 11:38:10 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/11/29 11:38:11 INFO input.FileInputFormat: Total input paths to process : 1
13/11/29 11:38:11 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
13/11/29 11:38:11 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 6298911ef75545c61859c08add6a74a83e0183ad]
13/11/29 11:38:12 INFO mapred.JobClient: Running job: job_201311251130_0208
13/11/29 11:38:13 INFO mapred.JobClient:  map 0% reduce 0%
13/11/29 11:38:40 INFO mapred.JobClient:  map 100% reduce 0%
13/11/29 11:38:49 INFO mapred.JobClient:  map 100% reduce 100%
13/11/29 11:38:57 INFO mapred.JobClient: Job complete: job_201311251130_0208
13/11/29 11:38:57 INFO mapred.JobClient: Counters: 32
13/11/29 11:38:57 INFO mapred.JobClient:   File System Counters
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of bytes read=36
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of bytes written=322478
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of large read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of write operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of bytes read=139
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of bytes written=7
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of read operations=2
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of large read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of write operations=1
13/11/29 11:38:57 INFO mapred.JobClient:   Job Counters 
13/11/29 11:38:57 INFO mapred.JobClient:     Launched map tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Launched reduce tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Data-local map tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=31068
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=6671
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/11/29 11:38:57 INFO mapred.JobClient:   Map-Reduce Framework
13/11/29 11:38:57 INFO mapred.JobClient:     Map input records=13
13/11/29 11:38:57 INFO mapred.JobClient:     Map output records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Map output bytes=14
13/11/29 11:38:57 INFO mapred.JobClient:     Input split bytes=103
13/11/29 11:38:57 INFO mapred.JobClient:     Combine input records=0
13/11/29 11:38:57 INFO mapred.JobClient:     Combine output records=0
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce input groups=1
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce shuffle bytes=32
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce input records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce output records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Spilled Records=2
13/11/29 11:38:57 INFO mapred.JobClient:     CPU time spent (ms)=4780
13/11/29 11:38:57 INFO mapred.JobClient:     Physical memory (bytes) snapshot=657629184
13/11/29 11:38:57 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3802001408
13/11/29 11:38:57 INFO mapred.JobClient:     Total committed heap usage (bytes)=1915486208
13/11/29 11:38:57 INFO mr.CountRecords: sum     13


使用Java Enum自定義Counter apache

一個Counter能夠是任意的Enum類,見以下代碼示例: bash

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * 使用Java Enum自定義Counter
 * 一個Counter能夠是任意的Enum類型。
 * 好比有個文件每行記錄了用戶的每次上網時長,統計上網時間超過30分鐘的次數,小於或等於30分鐘的次數
 * 能夠使用下面的代碼。最後的計數結果會顯示在終端上
 */
public class CounterTest extends Configured implements Tool {
	private final static Log log = LogFactory.getLog(CounterTest.class);

	public static void main(String[] args) throws Exception {
		String[] ars = new String[] { "hdfs://data2.kt:8020/test/input",
				"hdfs://data2.kt:8020/test/output" };
		int exitcode = ToolRunner.run(new CounterTest(), ars);
		System.exit(exitcode);
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		conf.set("fs.default.name", "hdfs://data2.kt:8020/");
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(args[1]), true);

		Job job = new Job();
		job.setJarByClass(CounterTest.class);

		job.setMapperClass(MyMap.class);
		job.setNumReduceTasks(0);

		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		int result = job.waitForCompletion(true) ? 0 : 1;
		
		//針對Counter結果的顯示
		Counters counters = job.getCounters();
		Counter counter1=counters.findCounter(NetTimeLong.OVER30M);
		log.info(counter1.getValue());
		log.info(counter1.getDisplayName()+","+counter1.getName());
		
		return result;
	}

	public static class MyMap extends
			Mapper<LongWritable, Text, NullWritable, Text> {
		private Counter counter1, counter2;

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			double temperature = Double.parseDouble(value.toString());
			if (temperature <= 30) {
				// get時若是不存在就會自動添加
				counter2 = context.getCounter(NetTimeLong.LOW30M);
				counter2.increment(1);
			} else if (temperature > 30) {
				counter1 = context.getCounter(NetTimeLong.OVER30M);
				counter1.increment(1);
			}
			context.write(NullWritable.get(), value);
		}
	}
}

enum NetTimeLong {
	OVER30M, LOW30M
}
相關文章
相關標籤/搜索