MapReduce擴展:應用程序如何運行於Hadoop Yarn之上

1. 背景
 
「應用程序運行於Hadoop Yarn之上」的需求來源於微博運維數據平臺中的調度系統,即調度系統中的任務須要運行於Hadoop Yarn之上。這裏的應用程序能夠簡單理解爲一個普通的進程(這裏特指Java進程),調度系統中的任務執行實際也是一個進程的運行過程,這裏咱們不討論爲何調度系統中的任務(進程)須要運行於Hadoop Yarn之上,僅僅討論如何使得一個應用程序(進程)能夠運行於Hadoop Yarn之上。
 
應用程序(進程)須要運行於Hadoop Yarn之上,有三種可選的實現方案:
 
(1)擴展實現Yarn Application的兩個組件:Yarn Client、ApplicationMaster;
(2)重用已有的計算框架,如:MapReduce、Spark;
(3)藉助於開源框架,如:Apache Twill;
 
咱們的實現方案最終肯定爲重用已有的計算框架:MapReduce,主要是基於如下幾個因素考慮的:
 
(1)Yarn Application的Yarn Client、ApplicationMaster的擴展實現過程異常複雜,須要對Hadoop Yarn有很是深刻的瞭解,對於咱們的目標而言這種方式過於「大材小用」,畢竟咱們只是但願一個普通的Java進程能夠運行於Hadoop Yarn之上便可;
(2)Apache Twill及相似的開源框架旨在簡化Yarn Application(Yarn Client、ApplicationMaster)的實現過程,但目前均處於孵化器狀態,不建議實際環境中使用;
(3)已有的計算框架中,MapReduce、Spark使用都比較普遍,而Spark使用Scala開發,不太適用於團隊目前的工程背影及與現有系統整合,所以選取了使用Java開發的MapReduce;
 
實現方案肯定以後,咱們的目標再也不是「應用程序如何運行於Hadoop Yarn之上」,變爲「一個普通的Java進程如何運行於Hadoop Yarn之上」,而MapReduce僅僅包含兩種類型的任務:Map Tasks、Reduce Tasks,進一步思考以後,咱們獲得最終目標:「一個普通的Java進程如何以一個MapReduce MapTask的形式運行於Hadoop Yarn之上」。
 
爲何是MapTask,而不是ReduceTask?
 
每個MapTask、ReduceTask都是一個Java進程,宏觀上看,ReduceTask須要運行於MapTask以後,即ReduceTask的運行必須依賴於全部的MapTask結束以後才能夠運行。
 
若是是「一個普通的Java進程以一個MapReduce ReduceTask的形式運行於Hadoop Yarn之上」,則Hadoop Yarn之上至少須要兩個進程:一個MapTask進程和一個ReduceTask進程,這與一個普通的Java進程的初衷是不符的,至關於以前只須要一個進程就能夠完成的任務,如今至少須要兩個進程。
 
MapReduce是能夠不須要ReduceTask的,這能夠經過設置Hadoop MapReduce屬性「mapreduce.job.reduces」的值爲0來實現。
 
2. 實現
 
2.1 YarnApplication
 
一個普通的Java進程如今至關於一個MapReduce MapTask進程,這個「普通的Java進程」的計算邏輯實際能夠是多種多樣的,這就要求咱們須要把MapReduce MapTask進程看做是一個「容器」,運行於其中的應用能夠有各類各樣的計算邏輯,只有這樣MapReduce MapTask的進程才能夠等價於一個普通的Java進程,雖然它包含不少額外的執行過程(與應用計算邏輯無關的)。
 
根據以往的經驗,「容器」中的應用一般須要實現特定的接口(Interface或Abstract class),爲此咱們特地設計了一個抽象類:YarnApplication,用於表示「容器」中的應用。
 
 
屬性
 
context:MapReduce MapTask Context(org.apache.hadoop.mapreduce.Mapper.Context<LongWritable, Text, LongWritable, Text>),即:MapReduce Mapper環境上下文,主要用於獲取Hadoop MapReduce配置屬性值;
 
方法
 
setContext:設置MapReduce Mapper Context(環境上下文);
 
execute:應用的計算邏輯以方法的形式封裝至方法execute,該方法能夠接受任意個字符串形式的參數;
 
簡而言之,一個普通的Java進程若是想運行於Yarn之上,如今僅僅須要實現本身特有的SpecificApplication,SpecificApplication須要繼承自YarnApplication,並重寫其中的execute方法,用於表示應用的計算邏輯,而後由「容器」(MapReduce MapTask)負責完成應用的執行過程,即:SpecificApplication execute的方法調用。
 
 
2.2 AppMapper
 
AppMapper是MapReduce Mapper的一個具體實現,它的功能就是2.1中說起的「容器」,用於完成多種多樣的YarnApplication的執行過程。
 
 
屬性
 
appClass:既然「容器」能夠支持多種多樣的YarnApplication的執行,那麼某一個具體的「容器實例」(即:AppMapper Task)啓動時,須要知道具體執行哪個YarnApplication實例;appClass用於保存YarnApplication實例的徹底限定類名,它的具體值能夠在Hadoop MapRedcue啓動時經過參數「app.class」進行指定;
 
args:用於表示YarnApplication execute方法執行時須要傳遞的參數,它的具體值能夠在Hadoop MapReduce啓動時經過參數「app.args」進行指定;
 
方法
 
setup:AppMapper實例的初始化過程當中獲取appClass與args的具體值;
 
 
map:AppMapper實例的具體執行過程,負責完成YarnApplication execute的調用過程;
 
 
具體的執行過程分爲三步:
 
(1)經過反射加載具體的YarnApplication實現類,並建立相應的實例app;
(2)實例app設置相應的Mapper環境上下文context;
(3)實例app執行execute方法;
 
cleanup:暫時沒有使用;
 
這裏有一點須要額外注意:AppMapper map沒有處理任何的數據輸入輸出。
 
 
2.3 AppInputFormat
 
AppMapper僅僅須要一個Map Task,所以InputSplit的數目爲1;AppMapper map僅僅執行一次,意味着InputSplit的記錄數目爲1;這樣的需求若是使用TextInputFormat,則要求咱們必須在HDFS上存儲一個文本文件,這個文本文件僅僅包含有一行文本,對於咱們的場景而言,太過煩瑣,所以咱們設計實現了專用的InputFormat:AppInputFormat。
 
InputFormat須要有兩個核心組件組成:InputSplit和RecordReader,AppInputFormat也不例外,以下:
 
 
如上所述,AppMapper map沒有處理任何的數據輸入,所以,AppInputFormat須要的InputSplit能夠是「虛擬」的,InputSplit中的記錄也能夠是「虛擬」的。
 
2.3.1 AppInputSplit
 
AppInputSplit就是一個「虛擬」的InputSplit,它沒有引用或關聯任何實際的數據。
 
 
方法
 
getLength:AppInputSplit是「虛擬」的,沒有引用或關聯任何實際的數據,所以數據長度爲0;
 
getLocations:AppInputSplit是「虛擬」的,沒有引用或關聯任何實際的數據,不須要考慮數據本地性的問題,所以僅返回一個「localhost」便可;
 
write、readFields:AppInputSplit不包括任何實例屬性,由於序列化方法(write)和反序列化方法(readFields)爲空便可;
 
2.3.2 AppRecordReader
 
雖然AppInputSplit是「虛擬」的,但它依然須要一個對應的RecordReader,且這個RecordReader須要可以從AppInputSplit中「讀取」到一條記錄,不然AppMapper map方法沒法獲得執行。
 
 
屬性
 
progress:表示AppInputSplit的處理進度,由於AppInputSplit僅僅包含一條「虛擬」記錄,所以progress只有兩個值:0.0和1.0,初始值爲0.0;
key:表示AppInputSplit中的那條「虛擬」記錄的KEY;
value:表示AppInputSplit中的那條「虛擬」記錄的VALUE;
 
方法
 
initialize:初始化AppRecordReader實例,爲空便可;
 
nextKeyValue:用於表示AppInputSplit中是否仍有記錄能夠讀取,若是實例變量key和value均爲null,表示有一條記錄能夠讀取;不然表示讀取完成,progress置爲1.0;
 
getCurrentKey:若是nextKeyValue()方法返回值爲true,表示有一條記錄能夠讀取,getCurrentKey()返回這條記錄的KEY,由於只有一條記錄,咱們這裏將KEY設置爲-1,並保存至實例變量key;
 
getCurrentValue:若是nextKeyValue()方法返回值爲false,表示有一條記錄能夠讀取,getCurrentValue()返回這條記錄的VALUE,由於只有一條記錄,咱們這裏將VALUE設置爲「APP Record」,並保存至實例變量value;
 
getProgress:直接返回實例變量progress的值便可;
 
close:沒有使用,爲空便可;
 
這裏有一點須要注意,AppRecordReader中並無使用到AppInputSplit,這是由於AppInputSplit及其中的記錄均可以理解爲是「虛擬」的,AppRecordReader只須要可以「讀取」到一條記錄便可,至於這條記錄是否是實際包含在AppInputSplit中是可有可無的。
 
2.3.3 AppInputFormat
 
 
方法
 
getSplits:AppMapper僅僅須要一個MapTask,所以只須要以數組的形式返回一個AppInputSplit實例便可;
 
createRecordReader:返回一個AppRecordReader實例便可;
 
 
2.4 AppOutputFormat
 
整個Hadoop MapReduce的運行過程當中僅僅只有一個AppMapper Task,而這個惟一的AppMapper Task沒有任何輸出,所以咱們須要一個「空」的OutputFormat:AppOutputFormat。
 
OutputFormat須要有兩個核心組件組成:RecordWriter和OutputCommitter,AppOutputFormat也不例外,以下:
 
 
2.4.1 AppRecordWriter
 
 
如上所述,由於沒有任何數據輸出,因此AppRecordWriter中的全部方法爲空便可。
 
2.4.2 AppOutputCommitter
 
 
OutputFormat使用OutputCommitter用於「提交」Task的輸出,由於沒有任何數據輸出,因此AppOutputCommitter中的全部方法爲空便可。
 
2.4.3 AppOutputFormat
 
 
方法
 
getRecordWriter:返回一個AppRecordWriter實例便可;
 
checkOutputSpecs:沒有任何數據輸出,爲空便可;
 
getOutputCommitter:返回一個AppOutputCommitter實例便可;
 
 
2.5 AppMapReduce
 
 
AppMapReduce繼承類Configured,並實現接口Tool,至關於MapReduce的一個驅動,可使用如下的方式來運行:
 
 
注:ToolRunner、Configured、Tool是Hadoop MapReduce提供的工具類,方便咱們運行Hadoop MapReduce。
 
 
綜上所述,YarnApplication、AppMapper、AppInputFormat(AppInputSplit、AppRecordReader)、AppOutputFormat(AppRecordWriter、AppOutputCommitter)、AppMapReduce之間的類關係以下圖:
 
 
3. 示例
 
假設咱們一個「HelloWorld」的應用,以一個普通的Java進程的實現方式以下:
 
 
若是想讓這個「HelloWorld」的應用運行於Hadoop Yarn之上,咱們須要實現一個HelloWorldApplication,它繼承自YarnApplication,並重寫其中的execute方法:
 
 
HelloWorldApplication的運行須要一個「驅動程序」:
 
 
這個「驅動程序」的運行方式以下:
 
java -cp conf/:target/scheduler-on-yarn-0.0.1-SNAPSHOT.jar:target/scheduler-on-yarn-0.0.1-SNAPSHOT-lib/* com.weibo.dip.yarnscheduler.example.HelloWorldApplicationExecutor -conf /etc/hadoop-offline/conf/core-site.xml -conf /etc/hadoop-offline/conf/hdfs-site.xml -conf /etc/hadoop-offline/conf/mapred-site.xml -conf /etc/hadoop-offline/conf/yarn-site.xml -D mapreduce.job.queuename=hive -D app.class=com.weibo.dip.yarnscheduler.app.HelloWorldApplication -D app.args=abc,def,ghi
 
參數說明:
 
-conf /etc/hadoop-offline/conf/core-site.xml
-conf /etc/hadoop-offline/conf/hdfs-site.xml
-conf /etc/hadoop-offline/conf/mapred-site.xml
-conf /etc/hadoop-offline/conf/yarn-site.xml
 
這幾個參數用於指定Hadoop集羣的配置文件;
 
-D mapreduce.job.queuename=hive
 
用於指定Hadoop MapReduce運行於Hadoop Yarn中的哪一個隊列;
 
-D app.class=com.weibo.dip.yarnscheduler.app.HelloWorldApplication
 
用於指定YarnApplication的具體實現類,此處爲HelloWorldApplication;
 
-D app.args=abc,def,ghi
 
用於指定HelloWorldApplication運行時所須要的參數,多個參數以「,」進行分隔。
 
代碼參考:
 
相關文章
相關標籤/搜索