MapReduce學習總結之Combiner、Partitioner、Jobhistory

1、Combinerjava

在MapReduce編程模型中,在Mapper和Reducer之間有一個很是重要的組件,主要用於解決MR性能瓶頸問題   web

722831-20160907111331707-136223051.png

 

 

 

 

 

 

combiner其實屬於優化方案,因爲帶寬限制,應該儘可能map和reduce之間的數據傳輸數量。它在Map端把同一個key的鍵值對合並在一塊兒並計算,計算規則和reduce一致,因此combiner也能夠看做特殊的Reducer(本地reduce)。 
執行combiner操做要求開發者必須在程序中設置了combiner(程序中經過job.setCombinerClass(myCombine.class)自定義combiner操做)apache

wordcount中直接使用myreduce做爲combiner:編程

// 設置Map規約Combiner
    job.setCombinerClass(MyReducer.class);

參考資料:https://www.tuicool.com/articles/qAzUjav瀏覽器

2、Partitioner服務器

  Partitioner也是MR的重要組件,主要功能以下:app

    1)Partitioner決定MapTask輸出的數據交由哪一個ReduceTask處理 webapp

    2)默認實現:分發的key的hash值對reduceTask 個數取模iphone

    which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,獲得當前的目的reducer。ide

    例子:

文件內容:xiaomi 200            
            huawei 500
            xiaomi 300
            huawei 700
            iphonex 100
            iphonex 30
            iphone7 60 
對上面文件內容按手機品牌分類分發到四個reduce處理計算:
 
package rdb.com.hadoop01.mapreduce;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
/**
 * 
 * @author rdb
 *
 */
public class PartitionerApp {
 
    /**
     * map讀取輸入文件
     * @author rdb
     *
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
         
 
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            //接收每一行數據
            String line = value.toString();
            //按空格進行分割 
            String[] words = line.split(" ");
            //經過上下文把map處理結果輸出
            context.write(new Text(words[0]), new LongWritable(Long.parseLong(words[1])));
        }
    }
     
    /**
     * reduce程序,歸併統計
     * @author rdb
     *
     */
    public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
         
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable value : values){
                //求單詞次數
                sum += value.get();
            }
            //經過上下文把reduce處理結果輸出
            context.write(key, new LongWritable(sum));
        }
    }
     
    /**
     * 自定義partition
     * @author rdb
     *
     */
    public static class MyPartitioner extends Partitioner<Text, LongWritable>{
 
        @Override
        public int getPartition(Text key, LongWritable value, int numPartitions) {
            if(key.toString().equals("xiaomi")){
                return 0;
            }
            if(key.toString().equals("huawei")){
                return 1;
            }
            if(key.toString().equals("iphonex")){
                return 2;
            }
            return 3;
        }
         
    }
     
    /**
     * 自定義driver:封裝mapreduce做業全部信息
     *@param args
     * @throws IOException 
     */
    public static void main(String[] args) throws Exception {
         
        //建立配置
        Configuration configuration = new Configuration();
         
        //清理已經存在的輸出目錄
        Path out = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(configuration);
        if(fileSystem.exists(out)){
            fileSystem.delete(out, true);
            System.out.println("output exists,but it has deleted");
        }
         
        //建立job
        Job job = Job.getInstance(configuration,"WordCount");
         
        //設置job的處理類
        job.setJarByClass(PartitionerApp.class);
         
        //設置做業處理的輸入路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
         
        //設置map相關的參數
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
         
        //設置reduce相關參數
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
         
        //設置combiner處理類,邏輯上和reduce是同樣的
        //job.setCombinerClass(MyReduce.class);
         
        //設置job partition
        job.setPartitionerClass(MyPartitioner.class);
        //設置4個reducer,每一個分區一個 
        job.setNumReduceTasks(4);
         
        //設置做業處理的輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
         
        System.exit(job.waitForCompletion(true)? 0 : 1) ;
    }
}
 
打包後調用:hadoop jar ~/lib/hadoop01-0.0.1-SNAPSHOT.jar rdb.com.hadoop01.mapreduce.PartitionerApp 
hdfs://hadoop01:8020/partitioner.txt  hdfs://hadoop01:8020/output/partitioner
 
結果: -rw-r--r--   1 hadoop supergroup         11 2018-05-09 06:35 /output/partitioner/part-r-00000
      -rw-r--r--   1 hadoop supergroup         12 2018-05-09 06:35 /output/partitioner/part-r-00001
      -rw-r--r--   1 hadoop supergroup         12 2018-05-09 06:35 /output/partitioner/part-r-00002
      -rw-r--r--   1 hadoop supergroup         11 2018-05-09 06:35 /output/partitioner/part-r-00003
       
[hadoop@hadoop01 lib]$ hadoop fs -text /output/partitioner/part-r-00000
18/05/09 06:36:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
xiaomi  500
[hadoop@hadoop01 lib]$ hadoop fs -text /output/partitioner/part-r-00001
18/05/09 06:36:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
huawei  1200
[hadoop@hadoop01 lib]$ hadoop fs -text /output/partitioner/part-r-00002
18/05/09 06:36:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
iphonex 130
[hadoop@hadoop01 lib]$ hadoop fs -text /output/partitioner/part-r-00003
18/05/09 06:36:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
iphone7 60

3、Jobhistory

 JobHistory用來記錄已經finished的mapreduce運行日誌,日誌信息存放於HDFS目錄中,默認狀況下沒有開啓此功能。須要配置。

1)配置hadoop-2.6.0-cdh5.7.0/etc/hadoop/mapred-site.xml

<property>
    <name>mapreduce.jobhistory.address</name>
    <value>hadoop01:10020</value>
    <description>MR JobHistory Server管理的日誌的存放位置</description>
</property>
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>hadoop01:19888</value>
    <description>查看歷史服務器已經運行完的Mapreduce做業記錄的web地址,須要啓動該服務才行</description>
</property>
<property>
    <name>mapreduce.jobhistory.done-dir</name>
    <value>/history/done</value>
    <description>MR JobHistory Server管理的日誌的存放位置,默認:/mr-history/done</description>
</property>
<property>
    <name>mapreduce.jobhistory.intermediate-done-dir</name>
    <value>/history/done_intermediate</value>
    <description>MapReduce做業產生的日誌存放位置,默認值:/mr-history/tmp</description>
</property>

2)配置好後重啓yarn.啓動jobhistory服務:hadoop-2.6.0-cdh5.7.0/sbin/mr-jobhistory-daemon.sh start historyserver

[hadoop@hadoop01 sbin]$ jps
24321 JobHistoryServer
24353 Jps
23957 NodeManager
7880 DataNode
8060 SecondaryNameNode
23854 ResourceManager
7791 NameNode
[hadoop@hadoop01 sbin]$

3)瀏覽器訪問 :http://192.168.44.183:19888/

     後臺跑一個MapReduce程序:hadoop jar ~/lib/hadoop01-0.0.1-SNAPSHOT.jar rdb.com.hadoop01.mapreduce.WordCountApp hdfs://hadoop01:8020/hello.txt  hdfs://hadoop01:8020/output/wc

刷新下瀏覽器能夠看到剛纔程序的日誌:

hisoty.png

點擊頁面中對應mr程序中的logs能夠看詳細日誌。

問題記錄:

R4@I[37YY5MU]1J(44XG~13.png

相關文章
相關標籤/搜索