MapReduce實例&YARN框架

MapReduce實例&YARN框架


一個wordcount程序

統計一個至關大的數據文件中,每一個單詞出現的個數。html

1、分析map和reduce的工做

map:java

  1. 切分單詞
  2. 遍歷單詞數據輸出

reduce:linux

對從map中獲得的數據的valuelist遍歷累加,獲得一個單詞的總次數apache

2、代碼

WordCountMapper(繼承Mapper)

重寫Mapper類的map方法。windows

mapreduce框架每讀一行數據就調用一次該方法,map的具體業務邏輯就寫在這個方法體中。緩存

  1. map和reduce的數據輸入輸出都是以key-value對的形式封裝的
  2. 4個泛型中,前兩個(KEYIN, VALUEIN)指定mapper輸入數據的類型, 後兩個(KEYOUT, VALUEOUT)指定輸出數據的類型
  3. 默認狀況下,框架傳遞給mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,value是這行的內容
  4. 因爲輸入輸出在結點中經過網絡傳遞,數據須要序列化,但JDK自帶的序列化機制會有附加信息冗餘,對於大量數據傳輸不合適,所以 <Long, String, String, Long> -> <LongWritable, Text, Text, LongWritable>
  5. 業務中要處理的數據已經做爲參數key-value被傳遞進來了,處理後的輸出是調用context.write()寫入到context
package cn.thousfeet.hadoop.mapreduce.wordcount;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        
        String line = value.toString();
        
        String[] words = StringUtils.split(line," "); //切分單詞
        
        for(String word : words) //遍歷 輸出爲key-value( <word,1> )
        {
            context.write(new Text(word), new LongWritable(1));
        }
    
    }
    
}

WordCountReducer(繼承Reducer)

重寫Reducer類的reduce方法。網絡

框架在map處理完成後,將全部的key-value對緩存起來進行分組,而後傳遞到一個組 <key,values{}>(對於wordcount程序,拿到的就是相似<hello,{1,1,1,1...}>),而後調用一次reduce方法。app

package cn.thousfeet.hadoop.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    @Override
    protected void reduce(Text key, Iterable<LongWritable> valueList,
            Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        
        long count = 0;
        
        for(LongWritable value : valueList) //遍歷value list累加求和
        {
            count += value.get();
        }
        
        context.write(key, new LongWritable(count)); //輸出這一個單詞的統計結果
    }
}

WordCountRunner

用於描述job。框架

好比,該做業使用哪一個類做爲邏輯處理中的map,哪一個做爲reduce。還能夠指定該做業要處理的數據所在的路徑,和輸出的結果放到哪一個路徑。eclipse

package cn.thousfeet.hadoop.mapreduce.wordcount;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

public class WordCountRunner {

        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            //設置整個job所用的那些類在哪一個jar包
            job.setJarByClass(WordCountRunner.class);
            
            //指定job使用的mapper和reducer類
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            
            //指定reduce和mapper的輸出數據key-value類型
            job.setOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            
            //指定mapper的輸出數據key-value類型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            
            //指定原始輸入數據的存放路徑
            FileInputFormat.setInputPaths(job, new Path("/wordcount/srcdata/"));
            
            //指定處理結果數據的存放路徑
            FileOutputFormat.setOutputPath(job, new Path("/wordcount/output/"));
        
            //將job提交給集羣運行 參數爲true時會打印運行進度
            job.waitForCompletion(true);
        }
}


上傳到集羣中運行

export成一個jar包,上傳到虛擬機上。

分發到集羣運行:hadoop jar wordcount.jar cn.thousfeet.hadoop.mapreduce.wordcount.WordCountRunner

查看輸出結果:

(能夠看到按key的字典序升序排序)


MapReduce程序幾種不一樣的提交運行模式

方式一:本機的JVM運行

首先,由於要在windows下直接調試,須要在eclipse的設置 Run Configurations->arguments->vm arguments ,添加-DHADOOP_USER_NAME=對應用戶

如需在本地直接run main方法(MapReduce程序在本機的JVM運行),要把輸入輸出路徑改成hdfs全路徑或把site.xml配置文件拖進來(或用在windows本地目錄下的數據也行,MapReduce程序的運行和數據來源在哪無關)。

方式二:本地debug實際運行在集羣

如需實如今本地run main方法而MapReduce實際運行在集羣(這種方式必須在linux下),應:

  1. 將mapred-site.xml和yarn-site.xml拖到工程的src目錄下(或給conf配置mapreduce.framework.nameyarn.resourcemanager.hostname等參數)
  2. 給工程導出一個jar包(好比放在工程目錄下),配置該job的jar包的路徑conf.set("mapreduce.job.jar","wordcount.jar");

(在windows下要用這種方法須要修改hadoop的YarnRunner這個類的源碼,或者安裝插件什麼的..)

提交到yarn集羣的job能夠在yarn的管理頁面(8088端口)看到。


yarn框架的運行機制

yarn只負責資源的分配,而後啓動運算框架的主管進程AppMaster(如運算框架是MapReduce時主管進程就是它的MRAppMaster),剩下的工做就不禁yarn去作了。

MapReduce只適合作數據的批量離線處理,而不適用於實時性的需求,要實現實時性要使用的運算框架是spark、storm那些,但均可以放在yarn框架下。yarn和運算框架分離的策略使得hadoop具備普遍的實用性和生命力。


yarn提交job的流程(關鍵源碼)


坑點

org.apache.hadoop.security.AccessControlException

運行程序後查看output文件夾能看到運行成功了,可是cat查看part-r-00000的時候報錯

error creating legacy BlockReaderLocal. Disabling legacy local reads.
org.apache.hadoop.security.AccessControlException: Can't continue with getBlockLocalPathInfo() authorization. The user thousfeet is not configured in dfs.block.local-path-access.user

解決方法是hdfs-site.xml中的配置項dfs.client.read.shortcircuit=false
woc,這個參數其實本來默認就是false...忽然想起這不是上次配置出錯的時候病急亂投醫加上的嗎,果真亂跟教程害死人orzz

(參考:http://www.51testing.com/html/59/445759-821244.html)

相關文章
相關標籤/搜索