MapReduce並行編程模型和框架

傳統的串行處理方式

有四組文本數據:html

「the weather is good」,
「today is good」,
「good weather is good」,
「today has good weather」java

對這些文本數據進行詞頻統計:apache

import java.util.Hashtable;
import java.util.Iterator;
import java.util.StringTokenizer;

/** * 傳統的串行計算方式詞頻統計 * * @version 2017年1月12日 下午4:05:33 */

public class WordCount {

    public static void main(String[] args) {

        String[] text = new String[]{
                "the weather is good","today is good",
                "good weather is good","today has good weather"
        };

        //同步、線程安全
         Hashtable ht = new Hashtable();
         //HashMap ht = new HashMap();
        for(int i=0;i<=3;i++){
             //字符串根據分隔符解析
            StringTokenizer st  = new StringTokenizer(text[i]);

            while (st.hasMoreTokens()) {

                String world = st.nextToken();

                if(!ht.containsKey(world)){
                    ht.put(world, new Integer(1));
                }else{
                    int wc  = ((Integer)ht.get(world)).intValue()+1;
                    ht.put(world, new Integer(wc));     
                }
            }//end of while 
        }//end of for

        //輸出統計結果
        for(Iterator itr = ht.keySet().iterator();itr.hasNext();){
            String world = (String) itr.next();
            System.out.println(world+": " +(Integer)ht.get(world)+ "; ");

        }


    }
}

一個MR分佈式程序

求出每一個年份的最高氣溫:編程

MaxTemperatureMapper.Java:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class MaxTemperatureMapper extends Mapper<LongWritable, Text,Text, IntWritable>
{
      @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

           //解析字段
           String line =value.toString(); 
           try{

               String year  = line.substring(0,4);
               int airTemperature =Integer.parseInt(line.substring(5));

               context.write(new Text(year),new IntWritable(airTemperature));

           }catch(Exception e){
               System.out.println("error in line:" + line);
           }
    }

}  

MaxTemperatureReducer.java:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/** * reducer 比較每一年度溫度最高值 * */
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {


   @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

            int MaxValue = Integer.MIN_VALUE;

            for(IntWritable value:values){
                MaxValue = Math.max(MaxValue, value.get());
            }

            context.write(key , new IntWritable(MaxValue));
    }
}

MaxTemperatureDriver.java:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxTemperatureDriver extends Configured implements Tool{

    @Override
    public int run(String[] args) throws Exception {
        // 對 參數進行判斷:參數個數不爲2,打印錯誤信息 
        if (args.length != 2){

            System.err.printf("Usage: %s <input><output>",getClass().getSimpleName());

            ToolRunner.printGenericCommandUsage(System.err);

            return -1; 

    }        

        Configuration conf =getConf(); 

        @SuppressWarnings("deprecation") //不檢測過時的方法
        Job job = new Job(conf);

        job.setJobName("Max Temperature"); 

        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job,new Path(args[0]));

        FileOutputFormat.setOutputPath(job,new Path(args[1])); 

        job.setMapperClass(MaxTemperatureMapper.class);

        job.setReducerClass(MaxTemperatureReducer.class); 

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class); 

        return job.waitForCompletion(true)?0:1; 
    }


    public static void main(String[] args)throws Exception{

        int exitcode = ToolRunner.run(new MaxTemperatureDriver(), args);

        System.exit(exitcode); 

    }
}

上傳數據至hadoop集羣:安全

這裏寫圖片描述

原始數據:
Temperature1:ruby

1990 21

1990 18

1991 21

1992 30

1990 21

Temperature2:markdown

1991 21

1990 18

1991 24

1992 30

1993 21

將程序打包上傳至主節點某個目錄下,執行網絡

hadoop jar  /data/jar/maxtemperature.jar   hdfs://192.168.75.128:9000/input  hdfs://192.168.75.128:9000/output/temperature

執行結果:架構

結果數據:app

1990    21
1991    24
1992    30
1993    21

完整的MapReduce編程模型

Combiner:進行中間結果數據網絡傳輸優化的工做。Combiner程序的執行是在Map節點完成計算以後、輸出結果以前。

Partitioner:將全部主鍵相同的鍵值對傳輸給同一個Reduce節點。分區的過程在Map節點輸出後、傳入Reduce節點以前完成的。

下面是針對四組數據的MapReduce完整的並行編程模型:

「the weather is good」,
「today is good」,
「good weather is good」,
「today has good weather」

這裏寫圖片描述

 

完整的MapReduce編程模型

 

(1)用戶程序會分紅三個部分:Mapper,Reducer,Driver
(2)Mapper的輸入數據是KV對的形式,KV的類型能夠設置
(3)Mapper的輸出數據是KV對的形式,KV的類型能夠設置
(4)Mapper中的業務邏輯寫在map方法中
(5)map方法是每進來一個KV對調用一次
(6)Reducer的輸入數據應該對應Mapper的輸出數據,也是KV
(7)Reducer的業務邏輯寫在reduce方法中
(8)reduce方法是對每個< key,valueList> 調用一次
(9)用戶的Mapper和Reducer都要繼承各自的父類
(10)整個程序須要一個Drvier來進行提交,提交的是一個描述了各類必要信息的job對象。

Hadoop系統架構和MapReduce執行流程

爲了實現Hadoop系統設計中本地化計算的原則,數據存儲節點DataNode與計算節點TaskTracker將合併設置,讓每一個從節點同時運行做爲DataNode和TaskTracker,以此讓每一個Tasktracker儘可能處理存儲在本地DataNode上的數據。

而數據存儲主控節點NameNode與做業執行主控節點JobTracker既能夠設置在同一個主控節點上,在集羣規模較大或者這兩個主控節點負載都很高以致於互相影響時,也能夠分開設置在兩個不一樣的節點上。

這裏寫圖片描述

 

Hadoop系統的基本組成構架

 

MapReduce程序的執行流程:

MapReduce執行一個用戶提交的MapReduce程序的基本過程。

這裏寫圖片描述

 

Hadoop MapReduce 程序執行流程

 

1) 首先,用戶程序客戶端經過做業客戶端接口程序JobClient提交一個用戶程序。
2) 而後JobClient向JobTracker提交做業執行請求並得到一個Job ID。
3) JobClient同時也會將用戶程序做業和待處理的數據文件信息準備好並存儲在HDFS中。
4) JobClient正式向JobTracker提交和執行該做業。
5) JobTracker接受並調度該做業,進行做業的初始化準備工做,根據待處理數據的實際分片狀況,調度和分配必定的Map節點來完成做業。
6) JobTracker 查詢做業中的數據分片信息,構建並準備相應的任務。
7) JobTracker 啓動TaskTracker節點開始執行具體的任務。
8) TaskTracker根據所分配的具體任務,獲取相應的做業數據。
9) TaskTracker節點建立所須要的Java虛擬機,並啓動相應的Map任務(或Reduce任務)的執行。
10) TaskTracker執行完所分配的任務以後,如果Map任務,則把中間結果數據輸出到HDFS中;如果Reduce任務,則輸出最終結果。
11) TaskTracker向JobTracker報告所分配的任務的完成。如果Map任務完成而且後續還有Reduce任務,則JobTracker會分配和啓動Reduce節點繼續處理中間結果並輸出最終結果。

參考學習資料:

1.HashMap和Hashtable的區別:
http://www.importnew.com/7010.html
2.StringTokenizer類的使用方法:
http://yacole.iteye.com/blog/41512

相關文章
相關標籤/搜索