Hadoop yarn工做流程詳解

yarn是什麼?
一、它是一個資源調度及提供做業運行的系統環境平臺
資源:cpu、mem等
做業:map task、reduce Taskjava


yarn產生背景?
它是從hadoop2.x版本才引入
一、hadoop1.x版本它是如何資源調度及做業運行機制原理
a、JobTracker(主節點)
(a):接受客戶端的做業提交
(b):交給任務調度器安排任務的執行
(c):通知空閒的TaskTracker去處理
(d): 與TaskTracker保持心跳機制node


b、TaskTracker(從節點)
(a):執行map task和reduce task
(b): 與JobTracker保持心跳機制apache

缺點:
一、單點故障問題
二、負載壓力
三、只能運行mapreduce的程序app

引入了yarn機制
一、減小負載壓力
二、主備機制
三、支持不一樣的程序運行ide

yarn總體的架oop


yarn主要的核心組件?idea

 

 


resourcemanagerspa


做用:
(1)接受客戶端提交做業
(2)啓動一個app master去處理
資源分配
(3)監控nodemanager3d


 

nodemanagercode


做用:
(1)管理單個節點上的資源
(2)接受resourcemanager發送過來的指令
(3)接受app master發送過來的指令
(4) 啓動Container


 

app master


(1)運行做業的主控者
(2)獲取切片數據
(3)從resourcemanager審請運行做業資源
(4)監控做業運行的狀態


 

Container


它其實就是一個虛擬主機的抽象,分配cpu和內存,主要運行做業

 

app master
Container
Client


 


yarn的工做機制(重點)
一、鏈接運行器平臺
根據mapreduce.framework.name變量配置
若是等於yarn:則建立YARNRunner對象
若是等於Local:則建立LocalJobRunner對象

二、若是是yarn平臺,對resoucemanager提交做業審請
三、resourcemanager返回一個jobid和數據保存目錄(hdfs://xxx/staging/xxx)
四、客戶端根據返回數據保存目錄路徑,將job.split、job.xml、jar文件提交到hdfs://xxx/staging/xxx目錄
五、提交數據資源以後,客戶端對resouremanager提交任務運行
六、resourcemanager將任務存儲任務隊列
七、resourcemanager發送命令nodemanager處理從任務取出的任務
八、nodemanager往resourcemanageer審請我要建立一個app master
a、在nodemanager建立一個container,再啓動app master
九、app master讀取數據切片處理方案
十、app master往resourcemanager審請運行資源
十一、resourcemanager往空閒的nodemanager主機發送指令,要建立Container
十二、app master往nodemanger發送運行指令,container運行任務。

以下圖:

 

 


是否能夠直接從本地idea直接將程序運行到yarn平臺?

以wordcount爲例:

代碼以下:

package com.gec.demo;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
* 做用:體現mapreduce的map階段的實現
* KEYIN:輸入參數key的數據類型
* VALUEIN:輸入參數value的數據類型
* KEYOUT,輸出key的數據類型
* VALUEOUT:輸出value的數據類型
*
* 輸入:
*      map(key,value)=偏移量,行內容
*
* 輸出:
*      map(key,value)=單詞,1
*
* 數據類型:
* java數據類型:
* int-------------->IntWritable
* long------------->LongWritable
* String----------->Text
* 它都實現序列化處理
*
 * */
public class WcMapTask extends Mapper<LongWritable, Text,Text, IntWritable>
{
    /*
    *根據拆分輸入數據的鍵值對,調用此方法,有多少個鍵,就觸發多少次map方法
    * 參數一:輸入數據的鍵值:行的偏移量
    * 參數二:輸入數據的鍵對應的value值:偏移量對應行內容
    * */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line=value.toString();

        String words[]=line.split(" ");

        for (String word : words) {

            context.write(new Text(word),new IntWritable(1));
        }

    }
}
package com.gec.demo;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
* 此類:處理reducer階段
*   彙總單詞次數
* KEYIN:輸入數據key的數據類型
* VALUEIN:輸入數據value的數據類型
* KEYOUT:輸出數據key的數據類型
* VALUEOUT:輸出數據value的數據類型
*
*
* */
public class WcReduceTask extends Reducer<Text, IntWritable,Text,IntWritable>
{
    /*
    * 第一個參數:單詞數據
    * 第二個參數:集合數據類型彙總:單詞的次數
    *
    * */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int count=0;

        for (IntWritable value : values) {

            count+=value.get();
        }

        context.write(key,new IntWritable(count));

    }
}
package com.gec.demo;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable sum=new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count=0;

        for (IntWritable value : values) {
            count+=value.get();
        }
        sum.set(count);
        context.write(key,sum);
    }
}
package com.gec.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf=new Configuration();
//        conf.set("fs.defaultFS","hdfs://hadoop-001:9000");
//        conf.set("mapreduce.framework.name","yarn");
//        conf.set("yarn.resourcemanager.hostname","hadoop-002");
        conf.set("mapred.jar","D:\\JAVA\\projectsIDEA\\BigdataStudy\\mrwordcountbyyarn\\target\\wordcountbyyarn-1.0-SNAPSHOT.jar");
        Job job=Job.getInstance(conf);
        //設置Driver類
        job.setJarByClass(App.class);

        //設置運行那個map task
        job.setMapperClass(WcMapTask.class);
        //設置運行那個reducer task
        job.setReducerClass(WcReduceTask.class);
        job.setCombinerClass(WcCombiner.class);

        //設置map task的輸出key的數據類型
        job.setMapOutputKeyClass(Text.class);
        //設置map task的輸出value的數據類型
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定要處理的數據所在的位置
        FileInputFormat.setInputPaths(job, "/wordcount/input/big.txt");
        //指定處理完成以後的結果所保存的位置
        FileOutputFormat.setOutputPath(job, new Path("/wordcount/output7"));
        //向yarn集羣提交這個job
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);

    }
}

 

其中

 

是由於在resource文件夾中直接添加配置文件

配置文件分別以下:

 

core-site.xml

 

hdfs-site.xml
mapred-site.xml
yarn-site.xml

 

 

 

 注意:這裏的配置文件要和虛擬機中的配置文件同樣,不然可能會出錯,最好的作法是從虛擬機中直接copy出來

相關文章
相關標籤/搜索