* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce; import java.io.IOException; /** * Maps input key/value pairs to a set of intermediate key/value pairs. * * <p>Maps are the individual tasks which transform input records into a * intermediate records. The transformed intermediate records need not be of * the same type as the input records. A given input pair may map to zero or * many output pairs.</p> * * <p>The Hadoop Map-Reduce framework spawns one map task for each * {@link InputSplit} generated by the {@link InputFormat} for the job. * <code>Mapper</code> implementations can access the {@link Configuration} for * the job via the {@link JobContext#getConfiguration()}. * * <p>The framework first calls * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by * {@link #map(Object, Object, Context)} * for each key/value pair in the <code>InputSplit</code>. Finally * {@link #cleanup(Context)} is called.</p> * * <p>All intermediate values associated with a given output key are * subsequently grouped by the framework, and passed to a {@link Reducer} to * determine the final output. Users can control the sorting and grouping by * specifying two key {@link RawComparator} classes.</p> * * <p>The <code>Mapper</code> outputs are partitioned per * <code>Reducer</code>. Users can control which keys (and hence records) go to * which <code>Reducer</code> by implementing a custom {@link Partitioner}. * * <p>Users can optionally specify a <code>combiner</code>, via * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the * intermediate outputs, which helps to cut down the amount of data transferred * from the <code>Mapper</code> to the <code>Reducer</code>. * * <p>Applications can specify if and how the intermediate * outputs are to be compressed and which {@link CompressionCodec}s are to be * used via the <code>Configuration</code>.</p> * * <p>If the job has zero * reduces then the output of the <code>Mapper</code> is directly written * to the {@link OutputFormat} without sorting by keys.</p> * * <p>Example:</p> * <p><blockquote><pre> * public class TokenCounterMapper * extends Mapper<Object, Text, Text, IntWritable>{ * * private final static IntWritable one = new IntWritable(1); * private Text word = new Text(); * * public void map(Object key, Text value, Context context) throws IOException { * StringTokenizer itr = new StringTokenizer(value.toString()); * while (itr.hasMoreTokens()) { * word.set(itr.nextToken()); * context.collect(word, one); * } * } * } * </pre></blockquote></p> * * <p>Applications may override the {@link #run(Context)} method to exert * greater control on map processing e.g. multi-threaded <code>Mapper</code>s * etc.</p> * * @see InputFormat * @see JobContext * @see Partitioner * @see Reducer */ public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN,VALUEIN> reader, RecordWriter<KEYOUT,VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } } /** * Called once at the beginning of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Expert users can override this method for more complete control over the * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } }
Mapper的四個方法是setup,map,cleanup和run。其中,setup和cleanup用於管理Mapper生命週期中的資源,setup在完成Mapper構造,即將開始執行map動做前調用,cleanup則在全部的map動做完成後被調用。方法map用於對一次輸入的key/value對進行map動做。run方法執行了上面描述的過程,它調用setup,讓後迭代全部的key/value對,進行map,最後調用cleanup。 java
org.apache.hadoop.mapreduce.lib.map中實現了Mapper的三個子類,分別是InverseMapper(將輸入<key, value> map爲輸出<value, key>),MultithreadedMapper(多線程執行map方法)和TokenCounterMapper(對輸入的value分解爲token並計數)。其中最複雜的是MultithreadedMapper,咱們就以它爲例,來分析Mapper的實現。 apache
InverseMapper源代碼: 安全
* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce.lib.map; import java.io.IOException; /** A {@link Mapper} that swaps keys and values. */ public class InverseMapper<K, V> extends Mapper<K,V,V,K> { /** The inverse function. Input keys and values are swapped.*/ @Override public void map(K key, V value, Context context ) throws IOException, InterruptedException { context.write(value, key); } }
TokenCountMapper源代碼: 多線程
* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce.lib.map; import java.io.IOException; /** * Tokenize the input values and emit each word with a count of 1. */ public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
MultithreadedMapper會啓動多個線程執行另外一個Mapper的map方法,它會啓動mapred.map.multithreadedrunner.threads(配置項)個線程執行Mapper:mapred.map.multithreadedrunner.class(配置項)。MultithreadedMapper重寫了基類Mapper的run方法,啓動N個線程(對應的類爲MapRunner)執行mapred.map.multithreadedrunner.class(咱們稱爲目標Mapper)的run方法(就是說,目標Mapper的setup和cleanup會被執行屢次)。目標Mapper共享同一份InputSplit,這就意味着,對InputSplit的數據讀必須線程安全。爲此,MultithreadedMapper引入了內部類SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分別繼承自RecordReader,RecordWriter和StatusReporter,它們經過互斥訪問MultithreadedMapper的Mapper.Context,實現了對同一份InputSplit的線程安全訪問,爲Mapper提供所需的Context。這些類的實現方法都很簡單。 app