yarn是什麼?
一、它是一個資源調度及提供做業運行的系統環境平臺
資源:cpu、mem等
做業:map task、reduce Taskjava
yarn產生背景?
它是從hadoop2.x版本才引入
一、hadoop1.x版本它是如何資源調度及做業運行機制原理
a、JobTracker(主節點)
(a):接受客戶端的做業提交
(b):交給任務調度器安排任務的執行
(c):通知空閒的TaskTracker去處理
(d): 與TaskTracker保持心跳機制node
b、TaskTracker(從節點)
(a):執行map task和reduce task
(b): 與JobTracker保持心跳機制apache
缺點:
一、單點故障問題
二、負載壓力
三、只能運行mapreduce的程序app
引入了yarn機制
一、減小負載壓力
二、主備機制
三、支持不一樣的程序運行ide
yarn總體的架構?oop
yarn主要的核心組件?idea
resourcemanagerspa
做用:
(1)接受客戶端提交做業
(2)啓動一個app master去處理
資源分配
(3)監控nodemanager3d
nodemanagercode
做用:
(1)管理單個節點上的資源
(2)接受resourcemanager發送過來的指令
(3)接受app master發送過來的指令
(4) 啓動Container
app master
(1)運行做業的主控者
(2)獲取切片數據
(3)從resourcemanager審請運行做業資源
(4)監控做業運行的狀態
Container:
它其實就是一個虛擬主機的抽象,分配cpu和內存,主要運行做業
app master
Container
Client
yarn的工做機制(重點)
一、鏈接運行器平臺
根據mapreduce.framework.name變量配置
若是等於yarn:則建立YARNRunner對象
若是等於Local:則建立LocalJobRunner對象
二、若是是yarn平臺,對resoucemanager提交做業審請
三、resourcemanager返回一個jobid和數據保存目錄(hdfs://xxx/staging/xxx)
四、客戶端根據返回數據保存目錄路徑,將job.split、job.xml、jar文件提交到hdfs://xxx/staging/xxx目錄
五、提交數據資源以後,客戶端對resouremanager提交任務運行
六、resourcemanager將任務存儲任務隊列
七、resourcemanager發送命令nodemanager處理從任務取出的任務
八、nodemanager往resourcemanageer審請我要建立一個app master
a、在nodemanager建立一個container,再啓動app master
九、app master讀取數據切片處理方案
十、app master往resourcemanager審請運行資源
十一、resourcemanager往空閒的nodemanager主機發送指令,要建立Container
十二、app master往nodemanger發送運行指令,container運行任務。
以下圖:
是否能夠直接從本地idea直接將程序運行到yarn平臺?
以wordcount爲例:
代碼以下:
package com.gec.demo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* * 做用:體現mapreduce的map階段的實現 * KEYIN:輸入參數key的數據類型 * VALUEIN:輸入參數value的數據類型 * KEYOUT,輸出key的數據類型 * VALUEOUT:輸出value的數據類型 * * 輸入: * map(key,value)=偏移量,行內容 * * 輸出: * map(key,value)=單詞,1 * * 數據類型: * java數據類型: * int-------------->IntWritable * long------------->LongWritable * String----------->Text * 它都實現序列化處理 * * */ public class WcMapTask extends Mapper<LongWritable, Text,Text, IntWritable> { /* *根據拆分輸入數據的鍵值對,調用此方法,有多少個鍵,就觸發多少次map方法 * 參數一:輸入數據的鍵值:行的偏移量 * 參數二:輸入數據的鍵對應的value值:偏移量對應行內容 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String words[]=line.split(" "); for (String word : words) { context.write(new Text(word),new IntWritable(1)); } } }
package com.gec.demo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /* * 此類:處理reducer階段 * 彙總單詞次數 * KEYIN:輸入數據key的數據類型 * VALUEIN:輸入數據value的數據類型 * KEYOUT:輸出數據key的數據類型 * VALUEOUT:輸出數據value的數據類型 * * * */ public class WcReduceTask extends Reducer<Text, IntWritable,Text,IntWritable> { /* * 第一個參數:單詞數據 * 第二個參數:集合數據類型彙總:單詞的次數 * * */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count+=value.get(); } context.write(key,new IntWritable(count)); } }
package com.gec.demo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WcCombiner extends Reducer<Text, IntWritable,Text,IntWritable> { private IntWritable sum=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count+=value.get(); } sum.set(count); context.write(key,sum); } }
package com.gec.demo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Hello world! * */ public class App { public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); // conf.set("fs.defaultFS","hdfs://hadoop-001:9000"); // conf.set("mapreduce.framework.name","yarn"); // conf.set("yarn.resourcemanager.hostname","hadoop-002"); conf.set("mapred.jar","D:\\JAVA\\projectsIDEA\\BigdataStudy\\mrwordcountbyyarn\\target\\wordcountbyyarn-1.0-SNAPSHOT.jar"); Job job=Job.getInstance(conf); //設置Driver類 job.setJarByClass(App.class); //設置運行那個map task job.setMapperClass(WcMapTask.class); //設置運行那個reducer task job.setReducerClass(WcReduceTask.class); job.setCombinerClass(WcCombiner.class); //設置map task的輸出key的數據類型 job.setMapOutputKeyClass(Text.class); //設置map task的輸出value的數據類型 job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定要處理的數據所在的位置 FileInputFormat.setInputPaths(job, "/wordcount/input/big.txt"); //指定處理完成以後的結果所保存的位置 FileOutputFormat.setOutputPath(job, new Path("/wordcount/output7")); //向yarn集羣提交這個job boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
其中
是由於在resource文件夾中直接添加配置文件
配置文件分別以下:
注意:這裏的配置文件要和虛擬機中的配置文件同樣,不然可能會出錯,最好的作法是從虛擬機中直接copy出來