Hadoop筆記

前言:html

  HADOOP的核心組成部分:HDFS文件系統和Mapreduce。在構建這個大數據分佈式應用框架的過程當中,解決了不少了共性問題而且都封裝爲開源的框架,這些框架徹底能夠拿來用Hadoop APIjava

   Pig 是一個基於Hadoop的大規模數據分析平臺,Pig爲複雜的海量數據並行計算提供了一個簡易的操做和編程接口。
   Chukwa 是基於Hadoop的集羣監控系統,由yahoo貢獻。用於管理大型分佈式系統的數據收集系統(2000+以上的節點, 系統天天產生的監控數據量在T級別)。
   Hive 是基於Hadoop的一個工具,提供完整的sql查詢功能,能夠將sql語句轉換爲MapReduce任務進行運行。
   ZooKeeper 高效的,可擴展的協調系統,存儲和協調關鍵共享狀態。
   HBase 是一個開源的,基於列存儲模型的分佈式數據庫。
   HDFS 是一個分佈式文件系統。有着高容錯性的特色,而且設計用來部署在低廉的硬件上,適合那些有着超大數據集的應用程序。
   MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算。
   Hadoop RPC 遠程過程調用RPC。
   Hadoop YARN 任務調度和集羣資源管理。
  其餘相關:
   Avro 數據序列化系統,由Doug Cutting牽頭開發,是一個數據序列化系統。
   Cassandra 可擴展的多主數據庫,沒有單點故障。是一套開源分佈式NoSQL數據庫系統。
   Mahout Apache旗下的一個開源項目,提供一些可擴展的機器學習領域經典算法的實現,旨在幫助開發人員更加方便快捷地建立智能應用程序。
   Tez 用於構建高性能批處理和交互式數據處理應用程序的可擴展框架,由Apache Hadoop中的YARN協調。node

一、HDFS文件系統linux

1.1 HDFS有不少特色:摘自http://www.javashuo.com/article/p-fmckmuuk-eo.html  源碼分析git

    ① 保存多個副本,且提供容錯機制,副本丟失或宕機自動恢復。默認存3份。github

    ② 運行在廉價的機器上。算法

    ③ 適合大數據的處理。多大?多小?HDFS默認會將文件分割成block,64M爲1個block。而後將block按鍵值對存儲在HDFS上,並將鍵值對的映射存到內存中。若是小文件太多,那內存的負擔會很重。sql


 NameNode元數據管理機制(非HA模式):數據庫

如上圖所示,HDFS也是按照Master和Slave的結構。分NameNode、SecondaryNameNode、DataNode這幾個角色。

NameNode:是Master節點,是大領導。管理數據塊映射;處理客戶端的讀寫請求;配置副本策略;管理HDFS的名稱空間;

SecondaryNameNode:是一個小弟,分擔大哥namenode的工做量;是NameNode的冷備份;合併fsimage和fsedits而後再發給namenode。

DataNode:Slave節點,奴隸,幹活的。負責存儲client發來的數據塊block;執行數據塊的讀寫操做。

熱備份:b是a的熱備份,若是a壞掉。那麼b立刻運行代替a的工做。

冷備份:b是a的冷備份,若是a壞掉。那麼b不能立刻代替a工做。可是b上存儲a的一些信息,減小a壞掉以後的損失。

fsimage:元數據鏡像文件(文件系統的目錄樹。)

edits:元數據的操做日誌(針對文件系統作的修改操做記錄)

namenode內存中存儲的是=fsimage+edits。

SecondaryNameNode負責定時默認1小時,從namenode上,獲取fsimage和edits來進行合併,而後再發送給namenode。減小namenode的工做量。
View Code

1.2 工做原理apache

   寫操做

有一個文件FileA,100M大小。Client將FileA寫入到HDFS上。

HDFS按默認配置。

HDFS分佈在三個機架上Rack1,Rack2,Rack3。

 

a. Client將FileA按64M分塊。分紅兩塊,block1和Block2;

b. Client向nameNode發送寫數據請求,如圖藍色虛線①------>。

c. NameNode節點,記錄block信息。並返回可用的DataNode,如粉色虛線②--------->。

    Block1: host2,host1,host3

    Block2: host7,host8,host4

    原理:

        NameNode具備RackAware機架感知功能,這個能夠配置。

        若client爲DataNode節點,那存儲block時,規則爲:副本1,同client的節點上;副本2,不一樣機架節點上;副本3,同第二個副本機架的另外一個節點上;其餘副本隨機挑選。

        若client不爲DataNode節點,那存儲block時,規則爲:副本1,隨機選擇一個節點上;副本2,不一樣副本1,機架上;副本3,同副本2相同的另外一個節點上;其餘副本隨機挑選。

d. client向DataNode發送block1;發送過程是以流式寫入。

    流式寫入過程,

        1>將64M的block1按64k的package劃分;

        2>而後將第一個package發送給host2;

        3>host2接收完後,將第一個package發送給host1,同時client想host2發送第二個package;

        4>host1接收完第一個package後,發送給host3,同時接收host2發來的第二個package。

        5>以此類推,如圖紅線實線所示,直到將block1發送完畢。

        6>host2,host1,host3向NameNode,host2向Client發送通知,說「消息發送完了」。如圖粉紅顏色實線所示。

        7>client收到host2發來的消息後,向namenode發送消息,說我寫完了。這樣就真完成了。如圖黃色粗實線

        8>發送完block1後,再向host7,host8,host4發送block2,如圖藍色實線所示。

        9>發送完block2後,host7,host8,host4向NameNode,host7向Client發送通知,如圖淺綠色實線所示。

        10>client向NameNode發送消息,說我寫完了,如圖黃色粗實線。。。這樣就完畢了。

分析,經過寫過程,咱們能夠了解到:

    ①寫1T文件,咱們須要3T的存儲,3T的網絡流量貸款。

    ②在執行讀或寫的過程當中,NameNode和DataNode經過HeartBeat進行保存通訊,肯定DataNode活着。若是發現DataNode死掉了,就將死掉的DataNode上的數據,放到其餘節點去。讀取時,要讀其餘節點去。

    ③掛掉一個節點,不要緊,還有其餘節點能夠備份;甚至,掛掉某一個機架,也不要緊;其餘機架上,也有備份。
View Code

讀操做

操做就簡單一些了,如圖所示,client要從datanode上,讀取FileA。而FileA由block1和block2組成。 

 

那麼,讀操做流程爲:

a. client向namenode發送讀請求。

b. namenode查看Metadata信息,返回fileA的block的位置。

    block1:host2,host1,host3

    block2:host7,host8,host4

c. block的位置是有前後順序的,先讀block1,再讀block2。並且block1去host2上讀取;而後block2,去host7上讀取;

 

上面例子中,client位於機架外,那麼若是client位於機架內某個DataNode上,例如,client是host6。那麼讀取的時候,遵循的規律是:

優選讀取本機架上的數據。
View Code

二、MapReduce

   MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算。概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想。MapReduce極大地方便了編程人員在不會分佈式並行編程的狀況下,將本身的程序運行在分佈式系統上。摘自:http://www.cnblogs.com/wangxin37/p/6501495.html 

   補充: 多個job在同一個main方法中提交:使用Shell腳本組織並執行。     

2.1  單詞分詞實例 

  

    

 

  WCMapper.java

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的類型
//map 和 reduce 的數據輸入輸出都是以 key-value對的形式封裝的
//默認狀況下,框架傳遞給咱們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,這一行的內容做爲value
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    
    //mapreduce框架每讀一行數據就調用一次該方法
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        //具體業務邏輯就寫在這個方法體中,並且咱們業務要處理的數據已經被框架傳遞進來,在方法的參數中 key-value
        //key 是這一行數據的起始偏移量     value 是這一行的文本內容
        
        //將這一行的內容轉換成string類型
        String line = value.toString();
        //對這一行的文本按特定分隔符切分
        String[] words = StringUtils.split(line, " ");
        //遍歷這個單詞數組輸出爲kv形式  k:單詞   v : 1
        for(String word : words){
         context.write(new Text(word), new LongWritable(1));
         }
       }
    }
View Code

  WCReducer.java

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        
    //框架在map處理完成以後,將全部kv對緩存起來,進行分組,而後傳遞一個組<key,valus{}>,調用一次reduce方法
    //<hello,{1,1,1,1,1,1.....}>
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context)
            throws IOException, InterruptedException {

        long count = 0;
        //遍歷value的list,進行累加求和
        for(LongWritable value:values){
             count += value.get();
        }
        //輸出這一個單詞的統計結果
         context.write(key, new LongWritable(count));
       }
}
View Code

  WCRunner.java

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 用來描述一個特定的做業
 * 好比,該做業使用哪一個類做爲邏輯處理中的map,哪一個做爲reduce
 * 還能夠指定該做業要處理的數據所在的路徑
 * 還能夠指定改做業輸出的結果放到哪一個路徑
 * ....
 * @author duanhaitao@itcast.cn
 *
 */
public class WCRunner {

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job wcjob = Job.getInstance(conf);
        
        //設置整個job所用的那些類在哪一個jar包
        wcjob.setJarByClass(WCRunner.class);
                
        //本job使用的mapper和reducer的類
        wcjob.setMapperClass(WCMapper.class);
        wcjob.setReducerClass(WCReducer.class);
                
        //指定reduce的輸出數據kv類型
        wcjob.setOutputKeyClass(Text.class);
        wcjob.setOutputValueClass(LongWritable.class);
        
        //指定mapper的輸出數據kv類型
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setMapOutputValueClass(LongWritable.class);
                
        //指定要處理的輸入數據存放路徑
        FileInputFormat.setInputPaths(wcjob, new Path("hdfs://weekend110:9000/wc/srcdata/"));
        
        //指定處理結果的輸出數據存放路徑
        FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://weekend110:9000/wc/output3/"));
        
        //將job提交給集羣運行 
        wcjob.waitForCompletion(true);
                
    }
           
}
View Code

 MR程序的幾種提交運行模式:

本地模型運行
1/在windows的eclipse裏面直接運行main方法,就會將job提交給本地執行器localjobrunner執行
      ----輸入輸出數據能夠放在本地路徑下(c:/wc/srcdata/----輸入輸出數據也能夠放在hdfs中(hdfs://weekend110:9000/wc/srcdata)
2/在linux的eclipse裏面直接運行main方法,可是不要添加yarn相關的配置,也會提交給localjobrunner執行
      ----輸入輸出數據能夠放在本地路徑下(/home/hadoop/wc/srcdata/----輸入輸出數據也能夠放在hdfs中(hdfs://weekend110:9000/wc/srcdata)  
      
集羣模式運行
1/將工程打成jar包,上傳到服務器,而後用hadoop命令提交  hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
2/在linux的eclipse中直接運行main方法,也能夠提交到集羣中去運行,可是,必須採起如下措施:
      ----在工程src目錄下加入 mapred-site.xml  和  yarn-site.xml 
      ----將工程打成jar包(wc.jar),同時在main方法中添加一個conf的配置參數 conf.set("mapreduce.job.jar","wc.jar");           
3/在windows的eclipse中直接運行main方法,也能夠提交給集羣中運行,可是由於平臺不兼容,須要作不少的設置修改
        ----要在windows中存放一份hadoop的安裝包(解壓好的)
        ----要將其中的lib和bin目錄替換成根據你的windows版本從新編譯出的文件
        ----再要配置系統環境變量 HADOOP_HOME  和 PATH
        ----修改YarnRunner這個類的源碼
View Code

 2.2 MapReduce執行過程

    MR的分發和管理依賴YARN,MR被放在Yarn上運行,實際是放在Yarn的ApplicationMaster中,由ApplicationMaster管理,ApplicationMaster在運行起來後,先向Yarn中ResourceManager註冊並所要資源,ResourceManager就通知每一個節點上的小弟:NodeManager,NodeManager接到通知,馬上執行,最終給ApplicationMaster提供一個運行的環境:Container。而後在Container中MapReduce(也能夠是Spark/Storm其餘計算)中就開始了本身計算。

    MapReduce運行的時候,會經過Mapper運行的任務讀取HDFS中的數據文件(InputFormat),而後調用本身的方法,處理數據(Shuffle),最後輸出(OutputFormat)。Reducer任務會接收Mapper任務輸出的數據,做爲本身的輸入數據,調用本身的方法,最後輸出到HDFS的文件中。在這個過程當中,用戶能夠自定義Reduce的併發數,Map的併發數依賴切片Spilt數,切片數依賴每一個切片的大小,能夠設置。整個流程如圖:

 

1.執行MR的命令:
hadoop jar <jar在linux的路徑> <main方法所在的類的全類名> <參數> 例子:

hadoop jar /root/wc1.jar cn.itcast.d3.hadoop.mr.WordCount hdfs://itcast:9000/words /out2

 2.MR執行流程

(1).客戶端提交一個mr的jar包給JobClient(提交方式:hadoop jar ...)
(2).JobClient經過RPC和JobTracker進行通訊,返回一個存放jar包的地址(HDFS)和jobId
(3).client將jar包寫入到HDFS當中(path = hdfs上的地址 + jobId)
(4).開始提交任務(任務的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
(5).JobTracker進行初始化任務
(6).讀取HDFS上的要處理的文件,開始計算輸入分片,每個分片對應一個MapperTask
(7).TaskTracker經過心跳機制領取任務(任務的描述信息)
(8).下載所需的jar,配置文件等
(9).TaskTracker啓動一個java child子進程,用來執行具體的任務(MapperTask或ReducerTask)
(10).將結果寫入到HDFS當中
View Code

 2.3 Mapper任務詳解

      每一個Mapper任務是一個java進程,它會讀取HDFS中的文件,解析成不少的鍵值對,通過咱們覆蓋的map方法處理後,轉換爲不少的鍵值對再輸出。整個Mapper任務的處理過程又能夠分爲如下幾個階段,如圖所示:

第一階段是把輸入文件按照必定的標準進行分片(InputSplit),每一個輸入片的大小是固定的。默認狀況下,輸入片(InputSplit)的大小與數據塊(Block)的大小是相同的。若是數據塊(Block)的大小是默認值64MB,輸入文件有兩個,一個是32MB,一個是72MB。那麼小的文件是一個輸入片,大文件會分爲兩個數據塊,那麼是兩個輸入片。一共產生三個輸入片。每個輸入片由一個Mapper進程處理。這裏的三個輸入片,會有三個Mapper進程處理。

第二階段是對輸入片中的記錄按照必定的規則解析成鍵值對。有個默認規則是把每一行文本內容解析成鍵值對。「鍵」是每一行的起始位置(單位是字節),「值」是本行的文本內容。

第三階段是調用Mapper類中的map方法。第二階段中解析出來的每個鍵值對,調用一次map方法。若是有1000個鍵值對,就會調用1000次map方法。每一次調用map方法會輸出零個或者多個鍵值對。

第四階段是按照必定的規則對第三階段輸出的鍵值對進行分區。比較是基於鍵進行的。好比咱們的鍵表示省份(如北京、上海、山東等),那麼就能夠按照不一樣省份進行分區,同一個省份的鍵值對劃分到一個區中。默認是隻有一個區。分區的數量就是Reducer任務運行的數量。默認只有一個Reducer任務。

第五階段是對每一個分區中的鍵值對進行排序。首先,按照鍵進行排序,對於鍵相同的鍵值對,按照值進行排序。好比三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數。那麼排序後的結果是<1,3>、<2,1>、<2,2>。若是有第六階段,那麼進入第六階段;若是沒有,直接輸出到本地的linux文件中。

第六階段是對數據進行歸約處理,也就是reduce處理。鍵相等的鍵值對會調用一次reduce方法。通過這一階段,數據量會減小。歸約後的數據輸出到本地的linxu文件中。本階段默認是沒有的,須要用戶本身增長這一階段的代碼。
View Code

 2.4 Reducer任務詳解

    每一個Reducer任務是一個java進程。Reducer任務接收Mapper任務的輸出,歸約處理後寫入到HDFS中,能夠分爲以下圖所示的幾個階段:

 

第一階段是Reducer任務會主動從Mapper任務複製其輸出的鍵值對。Mapper任務可能會有不少,所以Reducer會複製多個Mapper的輸出。

第二階段是把複製到Reducer本地數據,所有進行合併,即把分散的數據合併成一個大的數據。再對合並後的數據排序。

第三階段是對排序後的鍵值對調用reduce方法。鍵相等的鍵值對調用一次reduce方法,每次調用會產生零個或者多個鍵值對。最後把這些輸出的鍵值對寫入到HDFS文件中
View Code

 2.5 Shuffle  

Shuffle過程是MapReduce的核心,也被稱爲奇蹟發生的地方。摘自:http://blog.csdn.net/github_36444580/article/details/75208992

一、MapReduce中,Map階段處理的數據如何傳遞給Reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程就叫Shuffle;
二、Shuffle: 洗牌、發牌(核心機制:數據分區、排序、緩存);
三、具體來講:就是將map task輸出的處理結果數據,分發給reduce task,並在分發的過程當中,對數據按key進行了分區排序,分區和排序能夠分別根據WritableComparable接口、Partitioner接口( 見下圖:AreaPartitioner.java)實現自定義

Shuffle是MR處理流程中的一個過程,它的每個處理步驟是分散在各個map task和reduce task節點上完成的。

 Shuffle緩存流程

 Shuffle運行機制

上面的流程是整個mapreduce最全工做流程,可是shuffle過程只是從第7步開始到第16步結束,具體shuffle過程詳解,以下:
1)maptask收集咱們的map()方法輸出的kv對,放到內存緩衝區中
2)從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合併成大的溢出文件
4)在溢出過程當中,及合併的過程當中,都要調用partitoner進行分組和針對key進行排序
5)reducetask根據本身的分區號,去各個maptask機器上取相應的結果分區數據
6)reducetask會取到同一個分區的來自不一樣maptask的結果文件,reducetask會將這些文件再進行合併(歸併排序)
7)合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)

注意
Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快。
緩衝區的大小能夠經過參數調整,參數:io.sort.mb  默認100M
View Code

 Partition分區 

若是reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
若是1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;
若是reduceTask的數量=1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;
例如:假設自定義分區數爲5,則
(1)job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
(2)job.setNumReduceTasks(2);會報錯
(3)job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件

 改變分組數(Partition):直接影響就是最終Reduce輸出文件個數,Reduce的併發個數=Partintion個數。

 AreaPartitioner.java 

package cn.itcast.hadoop.mr.areapartition;

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;

public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{

    private static HashMap<String,Integer> areaMap = new HashMap<>();
    
    static{
        areaMap.put("135", 0);
        areaMap.put("136", 1);
        areaMap.put("137", 2);
        areaMap.put("138", 3);
        areaMap.put("139", 4);
    }
        
    @Override
    public int getPartition(KEY key, VALUE value, int numPartitions) {
        //從key中拿到手機號,查詢手機歸屬地字典,不一樣的省份返回不一樣的組號
        
        int areaCoder  = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));

        return areaCoder;
    }

}
View Code

 FlowSumArea.java

package cn.itcast.hadoop.mr.areapartition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.itcast.hadoop.mr.flowsum.FlowBean;


/**
 * 對流量原始日誌進行流量統計,將不一樣省份的用戶統計結果輸出到不一樣文件
 * 須要自定義改造兩個機制:
 * 一、改造分區的邏輯,自定義一個partitioner
 * 二、自定義reduer task的併發任務數
 * 
 * @author duanhaitao@itcast.cn
 *
 */
public class FlowSumArea {

    public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {

            //拿一行數據
            String line = value.toString();
            //切分紅各個字段
            String[] fields = StringUtils.split(line, "\t");
            
            //拿到咱們須要的字段
            String phoneNB = fields[1];
            long u_flow = Long.parseLong(fields[7]);
            long d_flow = Long.parseLong(fields[8]);
            
            //封裝數據爲kv並輸出
            context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

        }
        
        
    }
    
    
    public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values,Context context)
                throws IOException, InterruptedException {

            long up_flow_counter = 0;
            long d_flow_counter = 0;
            
            for(FlowBean bean: values){
                
                up_flow_counter += bean.getUp_flow();
                d_flow_counter += bean.getD_flow();
                
                
            }
            
            context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
            
            
            
        }
        
    }
    
    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(FlowSumArea.class);
        
        job.setMapperClass(FlowSumAreaMapper.class);
        job.setReducerClass(FlowSumAreaReducer.class);
        
        //設置咱們自定義的分組邏輯定義
        job.setPartitionerClass(AreaPartitioner.class);
        
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        //設置reduce的任務併發數,應該跟分組的數量保持一致
        job.setNumReduceTasks(1);
        
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        
        System.exit(job.waitForCompletion(true)?0:1);
        
        
    }
    
    
}
View Code

Combiner合併

1)combiner是MR程序中Mapper和Reducer以外的一種組件;
2)combiner組件的父類就是Reducer;
3)combiner和reducer的區別在於運行的位置:Combiner是在每個maptask所在的節點運行,Reducer是接收全局全部Mapper的輸出結果;
4)combiner的意義就是對每個maptask的輸出進行局部彙總,以減少網絡傳輸量;
6)combiner可以應用的前提是不能影響最終的業務邏輯,並且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來Mapper;

相關文章
相關標籤/搜索