從Hadoop2開始,官方把資源管理單獨剝離出來,主要是爲了考慮後期做爲一個公共的資源管理平臺,任何知足規則的計算引擎均可以在它上面執行。
因此YARN能夠實現HADOOP集羣的資源共享,不只僅能夠跑MapRedcue,還能夠跑Spark、Flink。java
我們以前部署Hadoop集羣的時候也對YARN的架構有了基本的瞭解
YARN主要負責集羣資源的管理和調度 ,支持主從架構,主節點最多能夠有2個,從節點能夠有多個
其中:ResourceManager:是主節點,主要負責集羣資源的分配和管理
NodeManager:是從節點,主要負責當前機器資源管理node
YARN主要管理內存和CPU這兩種資源類型
當NodeManager節點啓動的時候自動向ResourceManager註冊,將當前節點上的可用CPU信息和內存信息註冊上去。 這樣全部的nodemanager註冊完成之後,resourcemanager就知道目前集羣的資源總量了。linux
那咱們如今來看一下我這個一主兩從的集羣資源是什麼樣子的,打開yarn的8088界面
注意,這裏面顯示的資源是集羣中全部從節點的資源總和,不包括主節點的資源,
那咱們再詳細看一下每個從節點的資源信息
可是這個數值是對不上的,個人linux機器每臺只給它分配了2G的內存,經過free -m能夠看到
CPU只分配了1個,經過top命令能夠看到
那爲何在這裏顯示是內存是8G,CPU是8個呢?
看一下下面這2個參數web
yarn.nodemanager.resource.memory-mb:單節點可分配的物理內存總量,默認是8MB*1024,即8G yarn.nodemanager.resource.cpu-vcores:單節點可分配的虛擬CPU個數,默認是8複製代碼
看到沒有,這都是默認單節點的內存和CPU信息,就算你這個機器沒有這麼多資源,可是在yarn-default.xml中有這些默認資源的配置,這樣當nodemanager去上報資源的時候就會讀取這兩個參數的值,這也就是爲何咱們在前面看到了單節點都是8G內存和8個cpu,其實咱們的linux機器是沒有這麼大資源的,那你這就是虛標啊,確定不能這樣幹,你實際有多少就是多少,因此咱們能夠修改這些參數的值,修改的話就在yarn-site.xml中進行配置便可,改完以後就能夠看到真實的信息了。面試
接下來咱們來詳細分析一下YARN中的調度器,這個是很是實用的東西,面試的時候也會常常問到。
你們能夠想象一個場景,咱們集羣的資源是有限的,在實際工做中會有不少人向集羣中提交任務,那這時候資源如何分配呢?
若是你提交了一個很佔資源的任務,這一個任務就把集羣中90%的資源都佔用了,後面別人再提交任務,剩下的資源就不夠用了,這個時候怎麼辦?
讓他們等你的任務執行完了再執行?仍是說你把你的資源勻出來一些分給他,你少佔用一些,讓他也能慢慢執行?apache
YARN中支持三種調度器:架構
下面來看圖分析一下這三種調度器的特性app
在實際工做中咱們通常都是使用第二種,CapacityScheduler,從hadoop2開始,CapacityScheduler也是集羣中的默認調度器了, 那下面咱們到集羣上看一下,點擊左側的Scheduler查看ide
Capacity,這個是集羣的調度器類型,
下面的root是根的意思,他下面目前只有一個隊列,叫default,咱們以前提交的任務都會進入到這個隊列中。 下面咱們來修改一下,增長多個隊列函數
咱們的需求是這樣的,但願增長2個隊列,一個是online隊列,一個是offline隊列
而後向offline隊列中提交一個mapreduce任務
online隊列裏面運行實時任務
offline隊列裏面運行離線任務,咱們如今學習的mapreduce就屬於離線任務
實時任務咱們後面會學習,等講到了再具體分析。
這兩個隊列其實也是咱們公司中最開始分配的隊列,不過隨着後期集羣規模的擴大和業務需求的增長,後期又增長了多個隊列。
在這裏咱們先增長這2個隊列,後期再增長多個也是同樣的。
具體步驟以下:
修改集羣中etc/hadoop,目錄下的capacity-scheduler.xml配置文件,修改和增長如下參數,針對已有的參數,修改value中的值,針對沒有的參數,則直接增長,這裏的default是須要保留的,增長online,offline,這三個隊列的資源比例爲7:1:2
具體的比例須要根據實際的業務需求來,看大家那些類型的任務比較多,對應的隊列中資源比例就調高一些,咱們如今暫時尚未online任務,因此我就把online隊列的資源佔比設置的小一些。
先修改bigdata01上的配置
[root@bigdata01 hadoop]# vi capacity-scheduler.xml <property> <name>yarn.scheduler.capacity.root.queues</name> <value>default,online,offline</value> <description>隊列列表,多個隊列之間使用逗號分割</description> </property> <property> <name>yarn.scheduler.capacity.root.default.capacity</name> <value>70</value> <description>default隊列70%</description> </property> <property> <name>yarn.scheduler.capacity.root.online.capacity</name> <value>10</value> <description>online隊列10%</description> </property> <property> <name>yarn.scheduler.capacity.root.offline.capacity</name> <value>20</value> <description>offline隊列20%</description> </property> <property> <name>yarn.scheduler.capacity.root.default.maximum-capacity</name> <value>70</value> <description>Default隊列可以使用的資源上限.</description> </property> <property> <name>yarn.scheduler.capacity.root.online.maximum-capacity</name> <value>10</value> <description>online隊列可以使用的資源上限.</description> </property> <property> <name>yarn.scheduler.capacity.root.offline.maximum-capacity</name> <value>20</value><description>offline隊列可以使用的資源上限.</description> </property>複製代碼
修改好之後再同步到另外兩個節點上
而後重啓集羣才能生效
[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh [root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh複製代碼
進入yarn的web界面,查看最新的調度器隊列信息
注意了,如今默認提交的任務仍是會進入default的隊列,若是但願向offline隊列提交任務的話,須要指定隊列名稱,不指定就進默認的隊列
在這裏咱們還須要同步微調一下代碼,不然咱們指定的隊列信息 代碼是沒法識別的
拷貝WordCountJob類,新的類名爲WordCountJobQueue
主要在job配置中增長一行代碼
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class WordCountJobQueue { /** * 建立自定義mapper類 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { Logger logger = LoggerFactory.getLogger(MyMapper.class); /** * 須要實現map函數 * 這個map函數就是能夠接收k1,v1, 產生k2,v2 * * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1表明的是每一行的行首偏移量,v1表明的是每一行內容 // 對獲取到的每一行數據進行切割,把單詞切割出來 String[] words = v1.toString().split(" "); logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); // System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); for (String word : words) { // 迭代切割出來的單詞數據 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); logger.info("k2:"+word+"...v2:1"); // System.out.println("k2:"+word+"...v2:1"); // 把<k2,v2>寫出去 context.write(k2,v2); context.write(k2, v2); } } } /** * 建立自定義reducer類 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { Logger logger = LoggerFactory.getLogger(MyReducer.class); /** * 針對<k2,{v2……}>的數據進行累加求和,而且最終把數據轉化爲k3,v3寫出去 * * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L; for (LongWritable v2 : v2s) { logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); // System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum += v2.get(); } Text k3 = k2; LongWritable v3 = new LongWritable(sum); logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); // System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); context.write(k3, v3); } } public static void main(String[] args) { try { // job須要的配置參數 Configuration conf = new Configuration(); // 解析命令行中經過-D傳遞過來的參數,添加到conf中 String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // 建立一個job Job job = Job.getInstance(conf); // 注意:這一行必須設置,不然在集羣中執行的是找不到WordCountJob這個類 job.setJarByClass(WordCountJobQueue.class); // 指定輸入路徑(能夠是文件,也能夠是目錄) FileInputFormat.setInputPaths(job, new Path(remainingArgs[0])); // 指定輸出路徑(只能指定一個不存在的目錄) FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1])); // 指定map相關的代碼 job.setMapperClass(MyMapper.class); // 指定k2的類型 job.setMapOutputKeyClass(Text.class); // 指定v2的類型 job.setMapOutputValueClass(LongWritable.class); // 指定reduce相關的代碼 job.setReducerClass(MyReducer.class); // 指定k3的類型 job.setOutputKeyClass(Text.class); // 指定v3的類型 job.setOutputValueClass(LongWritable.class); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }複製代碼
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJobQueue -Dmapreduce.job.queuename=offline /test/hello.txt /out複製代碼
不指定依舊是defalut
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJobQueue /test/hello.txt /out複製代碼