hadoop-Mapper分析

* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce;

import java.io.IOException;

/** 
 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * <p>Maps are the individual tasks which transform input records into a 
 * intermediate records. The transformed intermediate records need not be of 
 * the same type as the input records. A given input pair may map to zero or 
 * many output pairs.</p> 
 * 
 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for 
 * the job via the {@link JobContext#getConfiguration()}.
 * 
 * <p>The framework first calls 
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, Context)} 
 * for each key/value pair in the <code>InputSplit</code>. Finally 
 * {@link #cleanup(Context)} is called.</p>
 * 
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
 *
 * <p>The <code>Mapper</code> outputs are partitioned per 
 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 * 
 * <p>Users can optionally specify a <code>combiner</code>, via 
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
 * intermediate outputs, which helps to cut down the amount of data transferred 
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 * 
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 *  
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 * 
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper 
 *     extends Mapper<Object, Text, Text, IntWritable>{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.collect(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote></p>
 *
 * <p>Applications may override the {@link #run(Context)} method to exert 
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
 * etc.</p>
 * 
 * @see InputFormat
 * @see JobContext
 * @see Partitioner  
 * @see Reducer
 */
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  public class Context 
    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    public Context(Configuration conf, TaskAttemptID taskid,
                   RecordReader<KEYIN,VALUEIN> reader,
                   RecordWriter<KEYOUT,VALUEOUT> writer,
                   OutputCommitter committer,
                   StatusReporter reporter,
                   InputSplit split) throws IOException, InterruptedException {
      super(conf, taskid, reader, writer, committer, reporter, split);
    }
  }
  
  /**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }
}

Mapper的四個方法是setup,map,cleanup和run。其中,setup和cleanup用於管理Mapper生命週期中的資源,setup在完成Mapper構造,即將開始執行map動做前調用,cleanup則在全部的map動做完成後被調用。方法map用於對一次輸入的key/value對進行map動做。run方法執行了上面描述的過程,它調用setup,讓後迭代全部的key/value對,進行map,最後調用cleanup。 java

org.apache.hadoop.mapreduce.lib.map中實現了Mapper的三個子類,分別是InverseMapper(將輸入<key, value> map爲輸出<value, key>),MultithreadedMapper(多線程執行map方法)和TokenCounterMapper(對輸入的value分解爲token並計數)。其中最複雜的是MultithreadedMapper,咱們就以它爲例,來分析Mapper的實現。 apache

InverseMapper源代碼: 安全

 * Licensed to the Apache Software Foundation (ASF) under one


package org.apache.hadoop.mapreduce.lib.map;


import java.io.IOException;


/** A {@link Mapper} that swaps keys and values. */
public class InverseMapper<K, V> extends Mapper<K,V,V,K> {


  /** The inverse function.  Input keys and values are swapped.*/
  @Override
  public void map(K key, V value, Context context
                  ) throws IOException, InterruptedException {
    context.write(value, key);
  }
  
}

 
 
 
 

  
  
  
  
  

 

TokenCountMapper源代碼: 多線程

 * Licensed to the Apache Software Foundation (ASF) under one


package org.apache.hadoop.mapreduce.lib.map;


import java.io.IOException;


/**
 * Tokenize the input values and emit each word with a count of 1.
 */
public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
    
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  
  @Override
  public void map(Object key, Text value, Context context
                  ) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}

 
 
 
 

  
  
  
  
  

 

MultithreadedMapper會啓動多個線程執行另外一個Mapper的map方法,它會啓動mapred.map.multithreadedrunner.threads(配置項)個線程執行Mapper:mapred.map.multithreadedrunner.class(配置項)。MultithreadedMapper重寫了基類Mapper的run方法,啓動N個線程(對應的類爲MapRunner)執行mapred.map.multithreadedrunner.class(咱們稱爲目標Mapper)的run方法(就是說,目標Mapper的setup和cleanup會被執行屢次)。目標Mapper共享同一份InputSplit,這就意味着,對InputSplit的數據讀必須線程安全。爲此,MultithreadedMapper引入了內部類SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分別繼承自RecordReader,RecordWriter和StatusReporter,它們經過互斥訪問MultithreadedMapper的Mapper.Context,實現了對同一份InputSplit的線程安全訪問,爲Mapper提供所需的Context。這些類的實現方法都很簡單。 app

相關文章
相關標籤/搜索