系列文章:
大數據系列:一文初識Hdfs
大數據系列2:Hdfs的讀寫操做
大數據謝列3:Hdfs的HA實現html
經過前文,咱們對Hdfs的已經有了必定的瞭解,本文將繼續以前的內容,介紹Yarn
與Yarn
在MapReduce 2
的應用node
在介紹Yarn
以前,咱們先介紹一下Mapreduce1
做業流程。算法
有了這個基礎,再去看看採用Yarn
之後的MapReduce2
到底有啥優點也許會有更好的理解。apache
首先先介紹一下相關的幾個實體:緩存
Client
:負責提交 MapReduce
做業jobtracker
:協調做業運行,是一個Jave程序,主類爲JobTracker
tasktracker
:運行做業劃分後的任務,是一個Jave程序,主類爲TaskTracker
Hdfs
:分佈式文件系統,用於在其餘實體之間共享做業文件做業流程圖以下:服務器
MapReduce Program
調用runJob()
建立JobClient
並告知其提交做業。
在提交做業後runJob()
會每秒輪詢做業進度,若是發生改變就把進度輸出控制檯。
做業成後輸出做業計數器,若是失敗,則輸出失敗信息。架構
JobClient
經過調用JobTracker.getNewJobId()
請求一個新的JoobId
。app
將運行做業須要的資源(做業Ja
r文件,配置文件,計算所得的輸入分片)複製到以JobId
命名的目錄下jobtracker
的HDFS
中。
做業Jar的會有多個副本(mapred.submit.replication
默認10),在運行做業的時候,tasktracker
能夠訪問多個副本。框架
調用JobTracker.submitJob()
方法告知jobtracker
做業準備執行。運維
JobTracker
接收到對submitJob()
的調用後,會把改調用放入一個內部隊列,交由做業調度起(job scheduler
)進行調度。
同時會對Job
初始化,包括建立一個表示Job
正在運行的對象,用來封裝任務和記錄的信息,用於追蹤任務的狀態和進程。
爲了建立人物運行列表,做業調度起會從共享文件系統中獲取JobCient
已經計算好的輸入分片信息。
而後爲每個分片建立一個map
任務。
至於reduce
任務則由JonConf
的mapred.reduce.task
決定,經過setNumReduceTask()
設置,
而後調度器建立相應數量的reduce
任務。
此時會被指定任務ID
tasktracker
與jobtracker
之間維持一個心跳,
做爲消息通道,tasktracker
或告知自身存活狀況與是否能夠運行新的任務。
根據信息,jobtracker
會決定是否爲tasktracker
分配任務(經過調度算法)。
這個過程當中,對於map
任務會考慮數據本地性,對於reduce
則不須要。
一旦tasktracker
被分配了任務,接下里就是執行,首先經過Hdfs
把做業的Jar
文件複製到tasktracker
所在的文件系統。
實現做業Jar
本地化。
同時,tasktracker
把須要的文件從Hdfs
複製到本地磁盤。
而後爲任務創建一個本地工做目錄,並將Jar
中的呢容解壓到這裏。
最後建立一個TaskRunner
實例運行該任務。
TaskerRunner
啓動一個新的JVM
用來運行每個任務。
分別執行MapTask
或者ReduceTask
,結束後告知TaskTracker
結束信息,同時TaskTracker
將該信息告知JobTracker
上面就是Maopreduce1
做業運行的流程。咱們先有個概念,後面介紹Yarn
的時候作下對比。
這裏說的
Mapreduce1
指的是Hadoop
初始版本(版本1以及更早的)中的Mapreduce
分佈式執行框架,也就是咱們上面的做業流程。
Mapreduce2
指的是使用Yarn
(Hadoop 2
以及之後版本)的Mapreduce
執行方式。
這裏Mapreduce一、Mapreduce2
指的不是Hadoop
版本,指的是Mapreduce
程序的不一樣執行機制而已。
Yarn (Yet Another Resource Negotiator)
是在Hadoop 2
引入的集羣資源管理系統,最初的目的是爲了改善MapReduce
的實現。
可是因爲其具備強大的通用性,能夠支持其餘的分佈式計算框架。
在引入的Yarn
後,Hadoop 2
的生態就發生了一變化,以下:
Yarn
提供請求和使用集羣資源的API
,可是通常都是由分佈式框架(Saprk、Flink
等)內部調用這些API
,
用戶則使用分佈式系統提供的更高層的API
。
這種方式向用戶隱藏了資源管理的細節,必定程度上下降了開發難度和運維成本。
Yarn
的結構很簡單,以下
Yarn
的核心思想是將資源管理和做業調度/監視功能拆分爲單獨的守護進程。
具體實現就是:
一個管理集羣上資源使用的全局資源管理器(RM,ResourceManager
);
運行在集羣全部結點上而且可以啓動和監控容器(Container
)的結點管理器(Node Manager
)
Container
是用於執行特定應用程序的進程,每一個資源都有資源限制(內存、CPU等)
Container
能夠是Unix
進程,也能夠Linux cgroup
Yarn的組成介紹就這麼簡單,接下來咱們就看看它怎麼提交執行一個任務。
這裏分爲兩部分,
第一部分會介紹Yarn任務提交流程,
第二部分會介紹Mapreduce 2 的提交流程
Yarn
任務的提交流程以下:
爲了在Yarn
上運行任務,Client
會向ResourceManager
發出運行 Application Master process
的請求。
Resource Manager
找到一個能夠運行Application Master
的NodeManager
。
NodeMager
啓動一個容器,運行Application Master
。
此時Application Master
會作什麼操做取決於Application
自己,
能夠是在在Application Master
執行一個簡單計算任務,將結果返回Client
,
也能夠向Resource Manager
申請更多容器。
申請到更多的container
。
從上面的步驟能夠發現,Yarn
自己是不會爲應用的各個部分(Client, Master, Process
)之間提供交互。
大多數基於Yarn
的任務使用某些遠程通訊機制(好比Hadoop RPC
)向客戶端傳遞信息。
這些RPC
通訊機制通常都是專屬於該應用的。
有了上面的基礎,具體的應用怎麼提交。
此處選用MapReduce 2
,與一開始MapReduce 1
作個對比
涉及到五個實體:
Client
:提交 MapReduce job
的客戶端YARN Resource Manager
:負責協調分配集羣計算資源YARN Node Managers
:啓動並監視集羣中機器上的計算容器。MapReduce Application Master
:協調MapReduce job
的任務執行。HDFS
:用於在其餘實體之間共享Job
文件
Application Master
和MapReduce Tasks
在容器中運行,他們由Resource Manager
調度,由Node Managers
管理
提交流程以下:
Job.sumbit()
方法建立一個內部的JobSummiter
實例,並調用其sunbmitJobInternal()
方法。
做業提交後,waitForCompletion()
會每秒輪詢返回做業的進度。
若是做業完成後,若是成功則顯示做業計數器,不然輸出錯誤。
JobSummiter
向Resource Manager
申請一個用於 MapReduce job ID
的新Application ID
。
這個過程會檢查做業,輸出說明:
例如,若是沒有指定輸出目錄,或者輸出目錄已經存在,則不會提交做業,並向MapReduce
程序拋出錯誤;
計算做業的輸入分片。
若是沒法計算分片(例如,由於輸入路徑不存在),則做業不提交,並向MapReduce
程序拋出錯誤。
將運行做業須要的資源(做業Jar
文件,配置文件,計算所得的輸入分片)複製到以JobId
命名的HDFS
的目錄下。
做業Jar
的會有多個副本(mapreduce.client.submit.file.replication
默認10),
當Node Managers
運行任務時,能夠跨集羣訪問許多副本。
經過調用Resource Manager
的submitApplication()
提交任務。
Resource Manager
收到submitApplication()
的調用請求後,將請求傳遞給Yarn
的調度器(Scheduler
)。
Scheduler
會爲其分配一個容器,
Node Manager
在容器中啓動一個Application Master
,主類爲MRAppMaster
。
因爲MRAppMaster
將從任務接收進度和完成報告,它經過建立許多簿記對象(bookkeeping objects
)來初始化做業,以跟蹤做業的進度。
接下來,MRAppMaster
從共享文件系統檢索在客戶機中計算的輸入切片,
它會爲每一個切片創建一個map task;
創建mapreduce.job.reduces
(由Job.setNumReduceTasks()
)數量的reduce task
。
MRAppMaster
根據任務的狀況決定是執行一個uber task
仍是向Resource Manager
請求更多的資源。
MRAppMaster
向Resource Manager
爲job
中全部的map、reduce tasks
申請容器。
一旦Resource Manager
的Scheduler
爲task
在指定的Node Manager
分配了容器之後,Application Master
就會請求Node Manager
分配容器。
Node Manager
收到請,啓動容器。容器中的主類爲YarnChild
,運行在專用的JVM
中,因此map、reduce
、甚至YarnChild
自己出現的錯誤都不會影響Node Manager
。
在運行task
以前,YarnChild
會對任務須要的資源進行本地化,包括job
配置、JAR
文件以及其餘來自Hdfs
的文件。
最後執行map 或 reduce
任務。
關於的ubertask
細節說明:
MRAppMaster
必須決定如何運行MapReduce job
。
利用並行的優點,確實能夠提升任務的執行效率,
可是在小任務或少任務的狀況下,
在新的容器中分配和運行任務所額外消耗的時間大於並行執行帶來效率的提高。
這個時候在一個節點上順序運行這些任務反而能得到更好的效率。
這樣的job被稱爲uber task
簡單的說就是並行執行的時候任務效率的提高還不夠彌補你從新申請資源、建立容器、分發任務等消耗的時間。
那麼怎樣纔算small job
呢?
默認狀況下:small job
是有少於10個mapper
,只有一個reducer
,一個輸入大小小於一個HDFS Block
大小的job。
固然也能夠經過參數 mapreduce.job.ubertask.maxmaps
,mapreduce.job.ubertask.maxreduces
, mapreduce.job.ubertask.maxbytes
進行設置。
對於Ubertasks
,mapreduce.job.ubertask.enable
必須設置爲true
。
對於步驟9補充說明:
在這個過程當中,會先申請map
任務的容器,
由於全部的map任務都必須在reduce的排序階段開始以前完成(Shuffle and Sort機制
)。
對reduce
任務的請求直到5%的map
任務完成纔會發出(reduce slow start機制
)。
對於reduce
任務,能夠在集羣的任何結點運行,
可是對map
任務,會有數據本地性的要求(詳情此處不展開)
申請還爲任務指定內存和cpu
。默認狀況下,每一個map
和reduce
任務分配1024 MB
內存和1個虛擬核,
能夠經過mapreduce.map.memory.mb
, mapreduce.reduce.memory.mb
, mapreduce.map.cpu.vcores
和 mapreduce.reduce.cpu.vcores
進行配置
上面就是Mapreduce2
的任務提交執行流程,一開始咱們就介紹了Mapreduce1
,如今咱們對比下兩個有啥區別。
本質就是結合
Mapreduce 2
對比Yarn
與Mapreduce1
調度的區別,因此後面Mapreduce 2
直接用Yarn
替換
Mapreduce 1
中,做業執行過程由兩類守護進程控制,分別爲一個jobtracker
和多個tasktracker
。
jobtracker
經過調度tasktracker
上的任務來協調運行在系統的Job
,並記錄返回的任務進度。
tasktracker
負責運行任務並向`jobtracker``發送任務進度。
jobtracker
同時負責做業的調度(分配任務與tasktracker
匹配)和任務進度監控(任務跟蹤、失敗重啓、記錄流水、維護進度、計數器等)
Yarn
中,也有兩類守護進程Resource Manager
和Nonde Manager
分別類比jobtracker
和tasktracker
。
可是不同的地方在於,jobtracker
的職責在Yarn
中被拆分,由兩個實體Reource Manger
和``Application Master```(每一個Job有一個)。
jobtracker
也能夠存儲做業歷史,或者經過運行一個獨立守護進程做爲歷史做業服務器。而與對應的,Yarn
提供一個時間軸服務器(timeline server
)來存儲應用的歷史。
兩者組成類比
Mapreduce 1 | Yarn |
---|---|
jobtracker |
Reource Manger Application Master timeline server |
tasktracker |
Nonde Manager |
Slot |
container |
對於兩者的區別,心血來潮想了個例子,但願可以幫助理解。
有三個角色:皇帝、大臣、打工人
如今有兩個狀況,
1:發生水災,須要賑災
2:敵寇入侵,邊疆告急
在這種狀況向 Mapreduce 1
的作法是:
Yarn
的作法:
簡單的說,就是Yarn
讓專業的人作專業的事情。
遇到事情找個專家,我只負責提需求和提供資源,
其餘的讓專家去作。
這個專家就是
MRAppMaster(Mapreduce)
,而對應的Spark也有本身的專家
由此也已總結下Yarn
帶來的優點:
Scalability
)Yarn
能夠在比MapReduce 1
更大的集羣上運行。MapReduce 1
在4000個節點和40000個任務的時候達到拓展性的瓶頸。jobtracker
須要管理做業和任務。Yarn
就拆分了這個,將做業與任務拆分,由Manager/Application Master
分別負責,能夠輕鬆將拓展至10,000 個節點 100,000 個任務。Availability)
jobtracker
的內存中有大量快速變化的複雜狀態(例如,每一個任務狀態每隔幾秒更新一次),這使得將在jobtracker
服務配置HA很是困難。Yarn
而言,因爲職責被拆分,那麼HA
也隨之變成了分治問題。Resource Manager
的HA,同時若是有須要能夠爲每一個人應用也提供HA。實際上對於
Mapreduce 2
對Resource Manager
和Application Master
都提供了HA,稍候介紹。
Utilization
)MapReduce 1
中,每一個tasktracker
都靜態配置若干個slot
,在配置的時候被劃分爲map slot
和reduce slot
,只能執行固定的任務。Yarn
中,Node Manager
管理一個資源池,只要有資源,任務就能夠運行。Multitenancy
)Hadoop
纔是Yarn
的最大優點。Spark、Flink
等等。此時MapReduce
也僅僅是在這之上的一個應用罷了接下來在說一下HA吧。
這裏主要結合Mapreduce 2
來講明
HA 針對的都是出現錯誤或失敗的狀況。
在咱們這裏,出現錯誤或失敗的場景有如下幾個
Application Master
失敗Node Manager
失敗Resource Manager
失敗接下來咱們分別看看這些狀況怎麼解決。
任務失敗的狀況有可能出現下面的狀況:
map、reduce task
代碼問題,這種失敗最多見,此時在task JVM
在退出前會向Application Master
發送錯誤報告,該報告會被計入用戶日誌。最後Application Master
會將該任務將任務嘗試標記爲failed
,並釋放容器,使其資源可供另外一個任務使用。task JVM
忽然退出,可能存在一個JVM bug
,致使JVM在特定環境下退出MapReduce
的用戶代碼。Node Manager
發現進程已經退出,會告知Application Master
,並將任務嘗試標記爲failed
。Application Master
注意到有一段時間沒有收到任務進度更新了,就會將該任務標記爲failed
,由參數mapreduce.task.timeout
(默認10分鐘,0表示關閉超時,此時長時間運行任務永遠不會標記爲failed
,慎用)設置。task 失敗的處理方式:
Application Master
發現任務失敗後,會從新調度該任務,會進行避免在以前失敗的Node Manager
上調度該任務。mapreduce.map.maxattempts
,mapreduce.reduce.maxattempts
),就不會繼續重調,整個Job也就失敗。mapreduce.map.failures.maxpercent
、mapreduce.reduce.failures.maxpercent
設置。Application Master
制動標記killed
不屬於任務失敗。推測機制(Speculative Execution),若是發現task執行的時間運行速度明顯慢於平均水平,就會在其餘的結點啓動一個相同的任務,稱爲推測執行。
這個不必定有效,僅僅是投機性的嘗試。
當任務成功完成時,任何正在運行的重複任務都將被終止,由於再也不須要它們。
就是推測任務與原始任務誰能上位就看誰先完成了。
當遇到Application Master
失敗是,Yarn
也會進行嘗試。
能夠經過配置mapreduce.am.max-attempts property
(默認:2)配置重試次數,
同時,Yarn
對於集羣中運行的Application Master
最大嘗試次數加了限制,也須要經過 yarn.resourcemanager.am.max-attempts
(默認:2)進行配置。
重試的流程以下:
Application Master
向Resource Manager
發送心跳,若是Application Master
發生故障,Resource Manager
將檢測故障,並在新的容器中啓動運行Application Master
的新實例
在MapReduce,它可使用做業歷史記錄來恢復(失敗的)應用程序已經運行的任務的狀態,這樣它們就沒必要從新運行。
默認狀況下恢復是啓用的,可是能夠經過設置yarn.app.mapreduce.am.job.recovery
來禁用。
MapReduce client
輪詢Application Master
的進度報告,
但若是它的Application Master
失敗,客戶端須要定位新的實例。
在Job
初始化期間,client
向Resource Manager
請求Application Master
的地址,而後對其進行緩存,這樣在每次須要輪詢Application Master
時,
就不會向Resource Manager
發出請求,從而使Resource Manager
負擔太重。
可是,若是Application Master
失敗,client
將發出狀態更新時超時,此時client
將向Resource Manager
請求新的Application Master
的地址。
若是Node Manager
因崩潰或運行緩慢而發生故障,它將中止向Resource Manager
發送心跳(或發送頻率很是低)。
Resource Manager
若是在10分鐘內(yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms
)沒有收到Node Manager
的心跳信息,
就會告訴該Node Manager
,中止發送心跳,並將它從本身的Nodes
池中移除。
在此Node Manager
失敗的task
或 Application Master
都回按照以前的說的方式恢復。
此外,即便map tasks
在失敗的Node Manager
上運行併成功完成但屬於未完成的job
,
Application Master
也會安排它們從新運行,
由於它們的中間輸出駐留在故障Node Manager
的本地文件系統上,reduce
任務可能沒法訪問。
若是一個Node Manager
失敗任務次數過多,該Node Manager
會被Application Master
拉入黑名單。
對於 MapReduce,若是一個Job在某個
Node Manager
失敗3個任務(mapreduce.job.maxtaskfailures.per.tracker
),就會嘗試在其餘的結點進行調度。
注意,Resource Manager
不會跨應用程序執行黑名單(編寫時),
所以來自新做業的任務可能會在壞節點上調度,即便它們已被運行較早做業的應用程序主程序列入黑名單。
Resource Manager
失敗是很嚴重的,一旦它失敗, job
和task
容器都沒法啓動。
在默認配置中,Resource Manager
是一個單故障點,由於在(不太可能的)機器故障的狀況下,全部正在運行的做業都失敗了而且沒法恢復。
要實現高可用性(HA),必須在一個active-standby
配置中運行一對Resource Manager
。
若是 active Resource Manager
發生故障,則standby Resource Manager
能夠接管,而不會對client
形成重大中斷。
經過將運行中的應用程序信息存儲在高可用的狀態存儲區中(經過ZooKeeper/HDFS
備份),實現standby Resource Manager
恢復active Resource Manager
(失敗)的關鍵狀態。
Node Manager
信息沒有存儲在狀態存儲中,由於當節Node Manager
第一次心跳時,新的Resource Manager
能夠相對較快地對其進行重構。
由於
task
由Application Master
管理,因此task
不屬於Resource Manager
的狀態,所以於Resource Manager
存儲的狀態比jobtracker
中的狀態更容易管理。
目前,有兩種持久化RMStateStore的方式,分別爲:FileSystemRMStateStore
和ZKRMStateStore
。
總體架構以下:
咱們能夠經過手動或自動重啓ResourceManager
。
被提高爲active 狀態的ResourceManager加載ResourceManager內部狀態,並根據ResourceManager restart特性儘量從上一個active Resource Manager 離開的地方繼續操做。
對於之前提交給ResourceManager的每一個託管Application,都會產生一個新的嘗試。
應用程序能夠按期checkpoint,以免丟失任何工做。
狀態存儲必須在兩個Active/Standby Resource Managers中均可見。
從上圖能夠看到咱們能夠選擇的狀態存儲介質有兩個FileSystemRMStateStore
和 ZKRMStateStore
。
ZKRMStateStore
隱式地容許在任什麼時候間點對單ResourceManagers
進行寫訪問,
所以在HA集羣中推薦使用ZKRMStateStore。
在使用ZKRMStateStore
時,不須要單獨的防護機制來解決可能出現的腦裂狀況,即多個Resource Manager
可能扮演active
角色。
而且ResourceManager
能夠選擇嵌入基於zookeeper
的ActiveStandbyElector
來決定哪一個Resource Manager
應該是active
的。
當active
的ResourceManager
關閉或失去響應時,另外一個Resource Manager
會自動被選爲active
,而後由它接管。
注意,不須要像HDFS那樣運行一個單獨的
ZKFC
守護進程,由於嵌入在Resource Manager
中的ActiveStandbyElector
充當故障檢測器和leader elector
,因此不須要單獨的ZKFC
守護進程。
關於Yarn的內容就介紹到這裏,更詳細的內容能夠參考官網
以後會更新一些Hdfs讀寫的源碼追蹤相關文章,有興趣能夠關注【兔八哥雜談】