Hadoop之YARN簡介

YARN的由來

從Hadoop2開始,官方把資源管理單獨剝離出來,主要是爲了考慮後期做爲一個公共的資源管理平臺,任何知足規則的計算引擎均可以在它上面執行。
因此YARN能夠實現HADOOP集羣的資源共享,不只僅能夠跑MapRedcue,還能夠跑Spark、Flink。java

YARN架構分析

我們以前部署Hadoop集羣的時候也對YARN的架構有了基本的瞭解
YARN主要負責集羣資源的管理和調度 ,支持主從架構,主節點最多能夠有2個,從節點能夠有多個
其中:ResourceManager:是主節點,主要負責集羣資源的分配和管理
NodeManager:是從節點,主要負責當前機器資源管理node

YARN資源管理模型

YARN主要管理內存和CPU這兩種資源類型
當NodeManager節點啓動的時候自動向ResourceManager註冊,將當前節點上的可用CPU信息和內存信息註冊上去。 這樣全部的nodemanager註冊完成之後,resourcemanager就知道目前集羣的資源總量了。linux

那咱們如今來看一下我這個一主兩從的集羣資源是什麼樣子的,打開yarn的8088界面

注意,這裏面顯示的資源是集羣中全部從節點的資源總和,不包括主節點的資源,
那咱們再詳細看一下每個從節點的資源信息
可是這個數值是對不上的,個人linux機器每臺只給它分配了2G的內存,經過free -m能夠看到
CPU只分配了1個,經過top命令能夠看到

那爲何在這裏顯示是內存是8G,CPU是8個呢?
看一下下面這2個參數web

yarn.nodemanager.resource.memory-mb:單節點可分配的物理內存總量,默認是8MB*1024,即8G
yarn.nodemanager.resource.cpu-vcores:單節點可分配的虛擬CPU個數,默認是8複製代碼

看到沒有,這都是默認單節點的內存和CPU信息,就算你這個機器沒有這麼多資源,可是在yarn-default.xml中有這些默認資源的配置,這樣當nodemanager去上報資源的時候就會讀取這兩個參數的值,這也就是爲何咱們在前面看到了單節點都是8G內存和8個cpu,其實咱們的linux機器是沒有這麼大資源的,那你這就是虛標啊,確定不能這樣幹,你實際有多少就是多少,因此咱們能夠修改這些參數的值,修改的話就在yarn-site.xml中進行配置便可,改完以後就能夠看到真實的信息了。面試

YARN中的調度器

接下來咱們來詳細分析一下YARN中的調度器,這個是很是實用的東西,面試的時候也會常常問到。
你們能夠想象一個場景,咱們集羣的資源是有限的,在實際工做中會有不少人向集羣中提交任務,那這時候資源如何分配呢?
若是你提交了一個很佔資源的任務,這一個任務就把集羣中90%的資源都佔用了,後面別人再提交任務,剩下的資源就不夠用了,這個時候怎麼辦?
讓他們等你的任務執行完了再執行?仍是說你把你的資源勻出來一些分給他,你少佔用一些,讓他也能慢慢執行?apache

YARN中支持三種調度器:架構

  • 1:FIFO Scheduler:先進先出(first in, first out)調度策略
  • 2:Capacity Scheduler:FIFO Scheduler的多隊列版本
  • 3:FairScheduler:多隊列,多用戶共享資源

下面來看圖分析一下這三種調度器的特性app

  • FIFO Scheduler:是先進先出的,你們都是排隊的,若是你的任務申請不到足夠的資源,那你就等着,等前面的任務執行結束釋放了資源以後你再執行。這種在有些時候是不合理的,由於咱們有一些任務的優先級比較高,咱們但願任務提交上去馬上就開始執行,這個就實現不了了。
  • CapacityScheduler:它是FifoScheduler的多隊列版本,就是咱們先把集羣中的整塊資源劃分紅多份,咱們能夠人爲的給這些資源定義使用場景,例如圖裏面的queue A裏面運行普通的任務,queueB中運行優先級比較高的任務。這兩個隊列的資源是相互對立的, 可是注意一點,隊列內部仍是按照先進先出的規則。
  • FairScheduler:支持多個隊列,每一個隊列能夠配置必定的資源,每一個隊列中的任務共享其所在隊列的全部資源,不須要排隊等待資源, 具體是這樣的,假設咱們向一個隊列中提交了一個任務,這個任務剛開始會佔用整個隊列的資源,當你再提交第二個任務的時候,第一個任務會把他的資源釋放出來一部分給第二個任務使用

在實際工做中咱們通常都是使用第二種,CapacityScheduler,從hadoop2開始,CapacityScheduler也是集羣中的默認調度器了, 那下面咱們到集羣上看一下,點擊左側的Scheduler查看ide

Capacity,這個是集羣的調度器類型,
下面的root是根的意思,他下面目前只有一個隊列,叫default,咱們以前提交的任務都會進入到這個隊列中。 下面咱們來修改一下,增長多個隊列函數

案例:YARN多資源隊列配置和使用

咱們的需求是這樣的,但願增長2個隊列,一個是online隊列,一個是offline隊列
而後向offline隊列中提交一個mapreduce任務
online隊列裏面運行實時任務
offline隊列裏面運行離線任務,咱們如今學習的mapreduce就屬於離線任務
實時任務咱們後面會學習,等講到了再具體分析。
這兩個隊列其實也是咱們公司中最開始分配的隊列,不過隨着後期集羣規模的擴大和業務需求的增長,後期又增長了多個隊列。
在這裏咱們先增長這2個隊列,後期再增長多個也是同樣的。
具體步驟以下:
修改集羣中etc/hadoop,目錄下的capacity-scheduler.xml配置文件,修改和增長如下參數,針對已有的參數,修改value中的值,針對沒有的參數,則直接增長,這裏的default是須要保留的,增長online,offline,這三個隊列的資源比例爲7:1:2
具體的比例須要根據實際的業務需求來,看大家那些類型的任務比較多,對應的隊列中資源比例就調高一些,咱們如今暫時尚未online任務,因此我就把online隊列的資源佔比設置的小一些。
先修改bigdata01上的配置

[root@bigdata01 hadoop]# vi capacity-scheduler.xml 
<property> <name>yarn.scheduler.capacity.root.queues</name> <value>default,online,offline</value> <description>隊列列表,多個隊列之間使用逗號分割</description> </property> <property> <name>yarn.scheduler.capacity.root.default.capacity</name> <value>70</value> <description>default隊列70%</description> </property> <property> <name>yarn.scheduler.capacity.root.online.capacity</name> <value>10</value> <description>online隊列10%</description> </property> <property> <name>yarn.scheduler.capacity.root.offline.capacity</name> <value>20</value> <description>offline隊列20%</description> </property> <property> <name>yarn.scheduler.capacity.root.default.maximum-capacity</name> <value>70</value> <description>Default隊列可以使用的資源上限.</description> </property> <property> <name>yarn.scheduler.capacity.root.online.maximum-capacity</name> <value>10</value> <description>online隊列可以使用的資源上限.</description> </property> <property> <name>yarn.scheduler.capacity.root.offline.maximum-capacity</name> <value>20</value><description>offline隊列可以使用的資源上限.</description> </property>複製代碼

修改好之後再同步到另外兩個節點上
而後重啓集羣才能生效

[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh 
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh複製代碼

進入yarn的web界面,查看最新的調度器隊列信息

注意了,如今默認提交的任務仍是會進入default的隊列,若是但願向offline隊列提交任務的話,須要指定隊列名稱,不指定就進默認的隊列
在這裏咱們還須要同步微調一下代碼,不然咱們指定的隊列信息 代碼是沒法識別的
拷貝WordCountJob類,新的類名爲WordCountJobQueue
主要在job配置中增長一行代碼

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class WordCountJobQueue {
    /**
     * 建立自定義mapper類
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        Logger logger = LoggerFactory.getLogger(MyMapper.class);
        /**
         * 須要實現map函數
         * 這個map函數就是能夠接收k1,v1, 產生k2,v2
         *
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // k1表明的是每一行的行首偏移量,v1表明的是每一行內容
            // 對獲取到的每一行數據進行切割,把單詞切割出來
            String[] words = v1.toString().split(" ");
            logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
           // System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            for (String word : words) {
                // 迭代切割出來的單詞數據
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                logger.info("k2:"+word+"...v2:1");
               // System.out.println("k2:"+word+"...v2:1");
                // 把<k2,v2>寫出去 context.write(k2,v2);
                context.write(k2, v2);
            }
        }
    }

    /**
     * 建立自定義reducer類
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        Logger logger = LoggerFactory.getLogger(MyReducer.class);
        /**
         * 針對<k2,{v2……}>的數據進行累加求和,而且最終把數據轉化爲k3,v3寫出去
         *
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                // System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                sum += v2.get();
            }
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
           // System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            context.write(k3, v3);
        }
    }

    public static void main(String[] args) {
        try {

            // job須要的配置參數
            Configuration conf = new Configuration();
            // 解析命令行中經過-D傳遞過來的參數,添加到conf中
            String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            // 建立一個job
            Job job = Job.getInstance(conf);
            // 注意:這一行必須設置,不然在集羣中執行的是找不到WordCountJob這個類
            job.setJarByClass(WordCountJobQueue.class);
            // 指定輸入路徑(能夠是文件,也能夠是目錄)
            FileInputFormat.setInputPaths(job, new Path(remainingArgs[0]));
            // 指定輸出路徑(只能指定一個不存在的目錄)
            FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
            // 指定map相關的代碼
            job.setMapperClass(MyMapper.class);
            // 指定k2的類型
            job.setMapOutputKeyClass(Text.class);
            // 指定v2的類型
            job.setMapOutputValueClass(LongWritable.class);
            // 指定reduce相關的代碼
            job.setReducerClass(MyReducer.class);
            // 指定k3的類型
            job.setOutputKeyClass(Text.class);
            // 指定v3的類型
            job.setOutputValueClass(LongWritable.class);
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}複製代碼
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJobQueue -Dmapreduce.job.queuename=offline /test/hello.txt /out複製代碼


不指定依舊是defalut

hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJobQueue  /test/hello.txt /out複製代碼

相關文章
相關標籤/搜索