大數據學習-----day08-----hadoop05-------0.補充(查詢源代碼的操做)1.MR程序數據處理全流程 2.yarn 3. merger案例(小文件合併)4.數據傾斜 5join

0. 補充(查詢源代碼的操做)node

(1)ctrl+shift+t   查找某個類linux

 

 

 (2)crtl+t查看類的繼承結構程序員

 

 (3)ctrl+o  查看類中的方法算法

 

 

 

 

1. MR程序數據處理全流程編程


 

 

 

 第一步:FileInputFormat找到指定路徑或文件夾(如果文件夾且有多個文件,會開啓多個map任務,默認是一個文件用一個map去處理),經過調用LineRecordReader類中的createKey(),createValue()方法,獲得多個偏移量和每行數據(offset,line)windows

第二步:這個偏移量以及line會做爲MapTask的參數,調用map方法,對全部line數據進行處理,從而獲得k,v(如a,1 a,1 b,1等),以後是調用HashPartitioner組件對map獲得的key進行分區處理(key%n),獲得分區號(partition),而後MapOutputBuffer組件調用collect方法將k,v,partition收集到數組中去,即將meta以及kv寫入環形緩衝區,默認大小100mb,而且在配置文件裏爲這個緩衝區設定了一個閥值,默認是0.80,同時map還會爲輸出操做啓動一個守護線程,若是緩衝區的內存達到了閥值的80%時候,這個守護線程就會把緩衝區中的內容寫到磁盤上,這個過程叫spill(溢出),能夠屢次溢出(產生多個溢出文件),若寫入內存的數據小於默認的100mb,則會溢出一次(產生一個溢出文件)。另外的20%內存能夠繼續寫入數據,寫入磁盤和寫入內存操做是互不干擾的,若是緩存區被撐滿了,那麼map就會阻塞寫入內存的操做,讓寫入磁盤操做完成後再繼續執行寫入內存操做,如此循環使用  數組

 

 每一對kv都會有一個meta來描述,一個meta佔固定4個字節緩存

 

 

 

 

 第三步:溢出的數據寫入磁盤中(經過SequenceFileOutPutFormat進行),能夠進行屢次溢出(當寫入內存的數據小於默認的100mb時,溢出一次),每一次溢出時都會產生一個溢出文件來記錄這些數據。注意:內存中的數據寫入磁盤時會進行歸併而且排序  即獲得上圖所示的數據磁盤中的數據能提供下載服務。shuffle會將各個分區的數據分發到指定的ReduceTask,接下來就是Reduce階段了網絡

 

 

 第四步:經過Facher下載到map階段的數據,並進行歸併排序獲得一個完整的數據,而後經過GroupingPartition組件中的compare方法判斷key是否相同,進而將相同key的值放入同一個迭代器。最後就是從迭代器中獲取這些數據,進行必定的操做獲得本身想要的數據格式,而後存入指定地方,上圖是存入HDFS中,至此整個MR程序數據流程就算走完了多線程

 

 2. yarn

2.1 概述:   

  yarn是一個資源管理系統。主要負責集羣資源的管理和調度,若是要將程序運行在yarn上須要兩個組件 , 客戶端和ApplicationMaster , 這個組件在編程的過程當中很是複雜 , 例如mapreduce運算框架有現成的實現類供程序員使用(JobClient , MRAppMaster)

  •  資源管理平臺 管理集羣的運算資源 和  資源分配
  • 程序運行的監控平臺 監控各個程序的運行情況 (程序任務的再處理分配)

MRAppMaster 是applicationMaster的一種實現 , 能夠將MapReduce程序運行在yarn上 .

MRAppMaster主要負責MapReduce程序的生命週期,做業管理 , 資源申請和再分配,Container的啓動和釋放

 

2.2  ResourceManager和NodeManager

 (1) ResourceManager(RM)

 RM是一個全局資源管理器,負責整個系統的資源管理和分配。它主要由兩個組件構成:調度器(Scheduler)和應用程序管理器(Applications Manager, ASM)。

  • 調度器

  調度器根據容量、隊列等限制條件(如每一個隊列分配必定的資源,最多執行必定數量的做業等),將系統中的資源分配給各個正在運行的應用程序。

  須要注意的是:該調度器是一個「純調度器,它再也不從事任何與具體應用程序相關的工做,好比不負責監控或者跟蹤應用的執行狀態等,也不負責從新啓動因應用執行失敗或者硬件故障而產生的失敗任務,這些均交由應用程序相關的ApplicationMaster完成。調度器僅根據各個應用程序的資源需求進行資源分配,而資源分配單位用一個抽象概念「資源容器」(Resource Container,簡稱Container)表示,Container是一個動態資源分配單位,它將內存、CPU、磁盤、網絡等資源封裝在一塊兒,從而限定每一個任務使用的資源量。此外,該調度器是一個可插拔的組件,用戶可根據本身的須要設計新的調度器,YARN提供了多種直接可用的調度器,好比Fair Scheduler和Capacity Scheduler等。

  •  應用程序管理器

  應用程序管理器負責管理整個系統中全部應用程序,包括應用程序提交、與調度器協商資源以啓動ApplicationMaster、監控ApplicationMaster運行狀態並在失敗時從新啓動它等

 (2)NodeManager

  a. 彙報本身的節點的資源狀況

  b. 領取屬於本身的任務,運行任務

  c 彙報本身處理的任務的執行狀況

2.3 多角度理解yarn

 2.3.1 並行編程

  在單機程序設計中,爲了快速處理一個大的數據集,一般採用多線程並行編程,其大體流程以下:先由操做系統啓動一個主線程,由他負責數據切分、任務分配、子線程啓動和銷燬等工做,而各個子線程只負責計算本身的數據,當全部子線程處理完數據後,主線程再退出。類比理解,YARN上的應用程序運行過程與之很是相近,只不過它是集羣上的分佈式並行編程。可將YARN看作一個雲操做系統,它負責爲應用程序啓動ApplicationMaster(至關於主線程),而後再由ApplicationMaster負責數據切分、任務分配、啓動監控等工做,而由ApplicationMaster啓動的各個Task(至關於子線程)僅負責本身的計算任務。當全部任務計算完成後,ApplicationMaster認爲應用程序運行完成,而後退出

2.3.2 資源管理系統

 

 2.4 yarn的安裝

 (1)由於安裝了hadoop,因此直接配置yarn-site.xml文件,配置文件以下:

<configuration>
<!-- 配置resourcemanager的機器的位置,這樣全部的nodemanager就能找到其位置 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>feng01</value>
</property>
<!-- 爲mr程序提供shuffle服務,提升數據傳輸效率,rpc傳輸效率低 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--  一臺NodeManager的總可用內存資源 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<!--  一臺NodeManager的總可用(邏輯)cpu核數 -->
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
<!--  是否檢查容器的虛擬內存使用超標狀況 -->
<property>
  <name>yarn.nodemanager.vmem-check-enabled</name>
  <value>false</value>
</property>
<!--  容器的虛擬內存使用上限:與物理內存的比率 -->
<property>
  <name>yarn.nodemanager.vmem-pmem-ratio</name>
  <value>2.1</value>
</property>
</configuration>

(2)而後將yarn-site.xml遠程複製到feng02,feng03,feng04

(3)開啓:start-yarn.sh,其會讀取slaves文件,啓動nodemanager

(4) 訪問feng01:8088,便可獲得以下界面

 

 2.5 程序提交到yarn

 2.5.1 window提交

 刪除HDFS根目錄上的全部文件命令hdfs dfs -rm -r hdfs://feng01:9000/*(要寫上絕對路徑)

 以之前的單詞統計爲案例

JobDriver

public class JobDriver {
    public static void main(String[] args) throws Exception {
        // 當前操做的用戶名
        System.setProperty("HADOOP_USER_NAME", "root") ;
        // 獲取mr程序運行時的初始化配置
        Configuration conf = new Configuration();
        // 默認 的狀況程序以本地模式local運行
        // 修改運行模式
        conf.set("mapreduce.framework.name", "yarn");
        // 設置resource manage機器的位置
        conf.set("yarn.resourcemanager.hostname", "feng01");
        //設置操做的文件系統  HDFS
        conf.set("fs.defaultFS", "hdfs://feng01:9000/");
       // windows運行程序在yarn上的誇平臺參數
        conf.set("mapreduce.app-submission.cross-platform", "true");
        
        
        Job job = Job.getInstance(conf);
        // 設置程序的jar包的路徑
        job.setJar("d://wc.jar");
//        job.setJarByClass(JobDriver.class);
        
        // 設置map和reduce類  調用類中自定義的map reduce方法的業務邏輯
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 設置map端輸出的key-value的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 設置reduce的key-value的類型   結果的最終輸出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        
        //設置reducetask的個數  默認1個
        //job.setNumReduceTasks(3);
        
        // 處理的文件的路徑
        FileInputFormat.setInputPaths(job, new Path("/data/wc/input"));
        // 結果輸出路徑
        FileOutputFormat.setOutputPath(job, new Path("/data/wc/output2"));
        // 提交任務   參數等待執行
        job.waitForCompletion(true) ;
    }
}
View Code

WordCountMapper

public class WordCountMapper  extends Mapper<LongWritable, Text, Text, IntWritable>{
    /**
     * 當nextKeyValue()  map
     * map方法是本身的自定義的map階段的業務邏  
     * map方法什麼時候執行???
     *      一行數據執行一次
     *  參數一  當前一行數據的偏移量
     *  參數二 當前這行數據
     *  參數三  context 上下文件  結果的輸出    輸出給reduce
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        //獲取一行數據
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) { // 
            context.write(new Text(word), new IntWritable(1));//a  1   a   1   a   1
        }
    }

}
View Code

WordCountReducer

public class WordCountReducer  extends Reducer<Text, IntWritable, Text, IntWritable>{
    /**
     * context.nextKey()  相同的key會被聚合到一個reduce方法中
     * 執行時機  一個單詞執行一次
     * key 單詞 
     * values  將當前的單詞出現的全部的1 存儲在迭代器中
     * 
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable intWritable : values) {
            count++ ;
        }
        // key是單詞    count是單詞對應的個數
        context.write(key, new IntWritable(count));
    }
}
View Code

 

2.5.2  linux提交

 

public class JobDriver {
    public static void main(String[] args) throws Exception {// 獲取mr程序運行時的初始化配置
        Configuration conf = new Configuration();
        // 默認 的狀況程序以本地模式local運行
        // 修改運行模式
        conf.set("mapreduce.framework.name", "yarn");
        // 設置resource manage機器的位置
        conf.set("yarn.resourcemanager.hostname", "feng01");
        //設置操做的文件系統  HDFS
        conf.set("fs.defaultFS", "hdfs://feng01:9000/"); // 獲取Job對象
        Job job = Job.getInstance(conf);
        // 設置程序的jar包的路徑
        job.setJar("d://wc.jar");
        job.setJarByClass(JobDriver.class);
        
        // 設置map和reduce類  調用類中自定義的map reduce方法的業務邏輯
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 設置map端輸出的key-value的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 設置reduce的key-value的類型   結果的最終輸出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        
        //設置reducetask的個數  默認1個
        //job.setNumReduceTasks(3);
        
        // 處理的文件的路徑
        FileInputFormat.setInputPaths(job, new Path("/data/wc/input"));
        // 結果輸出路徑
        FileOutputFormat.setOutputPath(job, new Path("/data/wc/output2"));
        // 提交任務   參數等待執行
        job.waitForCompletion(true) ;
    }
}

 

 3. merger案例(小文件合併)

 因爲mr程序中map任務的個數是按照文件的個數來決定的,默認是一個map任務處理一個文件,如果不少小文件的話,就須要不少mapTask去處理,這樣就會很浪費資源

HDFS不適合存儲小文件,MR程序不適合處理小文件

 若實在是有太多的小文件,解決方法:

  • 將小文件合併以後再 存儲
  • 若是小文件被存儲在HDFS中,不要使用默認的啓動maptask個數的算法,minSize(100)

 

在HDFS中有個文件夾中存在大量的小文件待處理

1) 默認的狀況下一個文件產生一個maptask任務去處理  產生大量的maptask  慢

2) 小文件存儲在HDFS中  每一個文件都有元數據的記錄  ,在namenode中的元數據是記錄在內存中一份

在存儲的數據必定大小的狀況下增長了namenode的工做壓力

記錄大量的元數據信息  ,整個集羣的存儲能力下降

 

 4 數據傾斜

  在數據按照key進行分區的時候  會產生一個區分的數據特別多  一個特別少  出現了數據傾斜問題。只有數據多的任務結束  任務纔算結束  總體的任務時間很長。

解決方法:

(1)避免分區

(2)key中拼接隨機數

 

 5. join案例

有兩個文件user.txt, order.txt,以下

 

 

 

 需求:獲取oid+user的拼接信息

代碼以下

JoinBean

public class JoinBean implements Writable {

    private String oid;
    private String uid;
    private String name;
    private int age;
    private String fd;
    /**
     * 標識類中存儲的是哪一個文件的數據
     */
    private String table;

    public String getOid() {
        return oid;
    }

    public void setOid(String oid) {
        this.oid = oid;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getFd() {
        return fd;
    }

    public void setFd(String fd) {
        this.fd = fd;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    @Override
    public String toString() {
        return uid + "," + name + "," + age + "," + fd;
    }

    
    
    /**
     * 在序列化的時候有讀寫的字段 讀寫的字段不能爲null
     */
    @Override
    public void readFields(DataInput din) throws IOException {
        this.oid = din.readUTF();
        this.uid = din.readUTF();
        this.name = din.readUTF();
        this.age = din.readInt();
        this.fd = din.readUTF();
        this.table = din.readUTF();

    }

/*    @Override
    public String toString() {
        return "JoinBean [oid=" + oid + ", uid=" + uid + ", name=" + name + ", age=" + age + ", fd=" + fd + ", table="
                + table + "]";
    }*/

    @Override
    public void write(DataOutput dout) throws IOException {
        dout.writeUTF(this.oid);//
        dout.writeUTF(this.uid);
        dout.writeUTF(this.name);
        dout.writeInt(this.age);
        dout.writeUTF(this.fd);
        dout.writeUTF(this.table);

    }

}
View Code

此處的注意點:在序列化的時候有讀寫的字段,讀寫的字段不能爲null

Join

public class Join {

    static class JoinMapper extends Mapper<LongWritable, Text, Text, JoinBean> {
        // 讀取數據 根據文件名 將數據封裝在不一樣的table標識的類
        String name = null ;
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, JoinBean>.Context context)
                throws IOException, InterruptedException {
            FileSplit fs = (FileSplit)context.getInputSplit();
             name = fs.getPath().getName();
        }
        Text k = new Text() ;
        JoinBean joinBean =  new JoinBean() ;
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JoinBean>.Context context)
                throws IOException, InterruptedException {
            try {
                String line = value.toString();
                if(name.startsWith("orders")) {// 訂單數據
                    //處理數據
                    String[] split = line.split(",");
                    joinBean.setOid(split[0]);
                    joinBean.setUid(split[1]);
                    joinBean.setName("");
                    joinBean.setAge(-1);
                    joinBean.setFd("");
                    joinBean.setTable("orders");
                }else {// 用戶數據
                    String[] split = line.split(",");
                    joinBean.setOid("");
                    joinBean.setUid(split[0]);
                    joinBean.setName(split[1]);
                    joinBean.setAge(Integer.parseInt(split[2]));
                    joinBean.setFd(split[3]);
                    joinBean.setTable("user");
                }
                
                k.set(joinBean.getUid()) ;
                context.write(k, joinBean);
                
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }

    }
    static class JoinReducer extends Reducer<Text, JoinBean, Text, NullWritable> {
        // 聚合同一我的的全部的數據
        //iters 裏面既有用戶的訂單數據(多個)    也有用戶數據(1)
        @Override
        protected void reduce(Text uid, Iterable<JoinBean> iters,
                Reducer<Text, JoinBean, Text, NullWritable>.Context context) throws IOException, InterruptedException {
            JoinBean user =  new JoinBean();
            List<JoinBean> ordersList = new ArrayList<>() ;
            // 獲取當前用戶全部的數據
            for (JoinBean joinBean : iters) {
                // 根據這個字段來判斷當前迭代的數據是訂單數據仍是用戶信息數據
                String table = joinBean.getTable();
                if("orders".equals(table)) { //  訂單數據
                    JoinBean order = new JoinBean();
                    order.setOid(joinBean.getOid());
                    order.setUid(joinBean.getUid());
                    //list
                    ordersList.add(order) ;
                }else {// 用戶數據
                    user.setUid(joinBean.getUid());
                    user.setName(joinBean.getName());
                    user.setAge(joinBean.getAge());
                    user.setFd(joinBean.getFd());
                }
                
            }
            
            for (JoinBean joinBean : ordersList) {
                // o0001,uid,name,age,fd
                String key = joinBean.getOid()+","+user.toString() ;
                context.write(new Text(key), NullWritable.get());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        
        // 獲取mr程序運行時的初始化配置
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);
                // 設置map和reduce類  調用類中自定義的map reduce方法的業務邏輯
                job.setMapperClass(JoinMapper.class);
                job.setReducerClass(JoinReducer.class);
                // 設置map端輸出的key-value的類型
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(JoinBean.class);
                // 設置reduce的key-value的類型   結果的最終輸出
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(NullWritable.class);
                
                //設置reducetask的個數  默認1個
                //job.setNumReduceTasks(3);
                
                // 處理的文件的路徑
                FileInputFormat.setInputPaths(job, new Path("D:\\data\\join\\input"));
                // 結果輸出路徑
                FileOutputFormat.setOutputPath(job, new Path("D:\\data\\join\\res"));
                // 提交任務   參數等待執行
                job.waitForCompletion(true) ;
        
        
    }

}
View Code
相關文章
相關標籤/搜索