大數據系列4:Yarn以及MapReduce 2

系列文章:
大數據系列:一文初識Hdfs
大數據系列2:Hdfs的讀寫操做
大數據謝列3:Hdfs的HA實現html

經過前文,咱們對Hdfs的已經有了必定的瞭解,本文將繼續以前的內容,介紹YarnYarnMapReduce 2的應用node


MapReduce1 做業流程

在介紹Yarn以前,咱們先介紹一下Mapreduce1做業流程。算法

有了這個基礎,再去看看採用Yarn之後的MapReduce2到底有啥優點也許會有更好的理解。apache

首先先介紹一下相關的幾個實體:緩存

  1. Client:負責提交 MapReduce 做業
  2. jobtracker:協調做業運行,是一個Jave程序,主類爲JobTracker
  3. tasktracker:運行做業劃分後的任務,是一個Jave程序,主類爲TaskTracker
  4. Hdfs:分佈式文件系統,用於在其餘實體之間共享做業文件

做業流程圖以下:服務器

  1. MapReduce Program 調用runJob()建立JobClient並告知其提交做業。
    在提交做業後runJob()會每秒輪詢做業進度,若是發生改變就把進度輸出控制檯。
    做業成後輸出做業計數器,若是失敗,則輸出失敗信息。架構

  2. JobClient經過調用JobTracker.getNewJobId()請求一個新的JoobIdapp

  3. 將運行做業須要的資源(做業Jar文件,配置文件,計算所得的輸入分片)複製到以JobId命名的目錄下jobtrackerHDFS中。
    做業Jar的會有多個副本(mapred.submit.replication默認10),在運行做業的時候,tasktracker能夠訪問多個副本。框架

  4. 調用JobTracker.submitJob()方法告知jobtracker做業準備執行。運維

  5. JobTracker接收到對submitJob()的調用後,會把改調用放入一個內部隊列,交由做業調度起(job scheduler)進行調度。
    同時會對Job初始化,包括建立一個表示Job正在運行的對象,用來封裝任務和記錄的信息,用於追蹤任務的狀態和進程。

  6. 爲了建立人物運行列表,做業調度起會從共享文件系統中獲取JobCient已經計算好的輸入分片信息。
    而後爲每個分片建立一個map任務。
    至於reduce任務則由JonConfmapred.reduce.task決定,經過setNumReduceTask()設置,
    而後調度器建立相應數量的reduce任務。
    此時會被指定任務ID

  7. tasktrackerjobtracker之間維持一個心跳,
    做爲消息通道,tasktracker或告知自身存活狀況與是否能夠運行新的任務。
    根據信息,jobtracker會決定是否爲tasktracker分配任務(經過調度算法)。
    這個過程當中,對於map任務會考慮數據本地性,對於reduce則不須要。

  8. 一旦tasktracker被分配了任務,接下里就是執行,首先經過Hdfs把做業的Jar文件複製到tasktracker所在的文件系統。
    實現做業Jar本地化。
    同時,tasktracker把須要的文件從Hdfs複製到本地磁盤。
    而後爲任務創建一個本地工做目錄,並將Jar中的呢容解壓到這裏。
    最後建立一個TaskRunner實例運行該任務。

  9. TaskerRunner啓動一個新的JVM用來運行每個任務。

  10. 分別執行MapTask或者ReduceTask,結束後告知TaskTracker結束信息,同時TaskTracker將該信息告知JobTracker

上面就是Maopreduce1做業運行的流程。咱們先有個概念,後面介紹Yarn的時候作下對比。

這裏說的Mapreduce1 指的是Hadoop初始版本(版本1以及更早的)中的Mapreduce分佈式執行框架,也就是咱們上面的做業流程。
Mapreduce2 指的是使用Yarn(Hadoop 2 以及之後版本)的Mapreduce執行方式。
這裏Mapreduce一、Mapreduce2指的不是Hadoop版本,指的是Mapreduce程序的不一樣執行機制而已。


Yarn

Yarn (Yet Another Resource Negotiator)是在Hadoop 2引入的集羣資源管理系統,最初的目的是爲了改善MapReduce的實現。
可是因爲其具備強大的通用性,能夠支持其餘的分佈式計算框架。

在引入的Yarn後,Hadoop 2的生態就發生了一變化,以下:

Yarn提供請求和使用集羣資源的API,可是通常都是由分佈式框架(Saprk、Flink等)內部調用這些API
用戶則使用分佈式系統提供的更高層的API

這種方式向用戶隱藏了資源管理的細節,必定程度上下降了開發難度和運維成本。


Yarn的結構很簡單,以下

Yarn的核心思想是將資源管理和做業調度/監視功能拆分爲單獨的守護進程。

具體實現就是:
一個管理集羣上資源使用的全局資源管理器(RM,ResourceManager);
運行在集羣全部結點上而且可以啓動和監控容器(Container)的結點管理器(Node Manager)

Container是用於執行特定應用程序的進程,每一個資源都有資源限制(內存、CPU等)
Container能夠是Unix進程,也能夠Linux cgroup

Yarn的組成介紹就這麼簡單,接下來咱們就看看它怎麼提交執行一個任務。


提交任務

這裏分爲兩部分,
第一部分會介紹Yarn任務提交流程,
第二部分會介紹Mapreduce 2 的提交流程


Yarn任務提交流程

Yarn 任務的提交流程以下:

  1. 爲了在Yarn上運行任務,Client會向ResourceManager發出運行 Application Master process的請求。

  2. Resource Manager找到一個能夠運行Application MasterNodeManager

  3. NodeMager啓動一個容器,運行Application Master

  4. 時Application Master會作什麼操做取決於Application自己,
    能夠是在在Application Master執行一個簡單計算任務,將結果返回Client
    也能夠向Resource Manager申請更多容器。

  5. 申請到更多的container

從上面的步驟能夠發現,Yarn自己是不會爲應用的各個部分(Client, Master, Process)之間提供交互。
大多數基於Yarn的任務使用某些遠程通訊機制(好比Hadoop RPC)向客戶端傳遞信息。
這些RPC通訊機制通常都是專屬於該應用的。


MapReduce 2 任務提交流程

有了上面的基礎,具體的應用怎麼提交。
此處選用MapReduce 2,與一開始MapReduce 1作個對比

涉及到五個實體:

  1. Client:提交 MapReduce job的客戶端
  2. YARN Resource Manager:負責協調分配集羣計算資源
  3. YARN Node Managers:啓動並監視集羣中機器上的計算容器。
  4. MapReduce Application Master:協調MapReduce job的任務執行。
  5. HDFS:用於在其餘實體之間共享Job文件

Application MasterMapReduce Tasks 在容器中運行,他們由Resource Manager調度,由Node Managers管理

提交流程以下:

  1. Job.sumbit()方法建立一個內部的JobSummiter實例,並調用其sunbmitJobInternal()方法。
    做業提交後,waitForCompletion()會每秒輪詢返回做業的進度。
    若是做業完成後,若是成功則顯示做業計數器,不然輸出錯誤。

  2. JobSummiterResource Manager申請一個用於 MapReduce job ID的新Application ID
    這個過程會檢查做業,輸出說明:
    例如,若是沒有指定輸出目錄,或者輸出目錄已經存在,則不會提交做業,並向MapReduce程序拋出錯誤;
    計算做業的輸入分片。
    若是沒法計算分片(例如,由於輸入路徑不存在),則做業不提交,並向MapReduce程序拋出錯誤。

  3. 將運行做業須要的資源(做業Jar文件,配置文件,計算所得的輸入分片)複製到以JobId命名的HDFS的目錄下。
    做業Jar的會有多個副本(mapreduce.client.submit.file.replication默認10),
    Node Managers運行任務時,能夠跨集羣訪問許多副本。

  4. 經過調用Resource ManagersubmitApplication()提交任務。

  5. Resource Manager收到submitApplication()的調用請求後,將請求傳遞給Yarn的調度器(Scheduler)。
    Scheduler會爲其分配一個容器,

  6. Node Manager在容器中啓動一個Application Master,主類爲MRAppMaster

  7. 因爲MRAppMaster將從任務接收進度和完成報告,它經過建立許多簿記對象(bookkeeping objects)來初始化做業,以跟蹤做業的進度。

  8. 接下來,MRAppMaster從共享文件系統檢索在客戶機中計算的輸入切片,
    它會爲每一個切片創建一個map task;
    創建mapreduce.job.reduces(由Job.setNumReduceTasks())數量的reduce task
    MRAppMaster根據任務的狀況決定是執行一個uber task仍是向Resource Manager請求更多的資源。

  9. MRAppMasterResource Managerjob中全部的map、reduce tasks申請容器。

  10. 一旦Resource ManagerSchedulertask在指定的Node Manager分配了容器之後,Application Master就會請求Node Manager分配容器。

  11. Node Manager收到請,啓動容器。容器中的主類爲YarnChild,運行在專用的JVM中,因此map、reduce、甚至YarnChild自己出現的錯誤都不會影響Node Manager

  12. 在運行task以前,YarnChild會對任務須要的資源進行本地化,包括job配置、JAR文件以及其餘來自Hdfs的文件。

  13. 最後執行map 或 reduce 任務。

關於的ubertask細節說明:

MRAppMaster必須決定如何運行MapReduce job
利用並行的優點,確實能夠提升任務的執行效率,
可是在小任務或少任務的狀況下,
在新的容器中分配和運行任務所額外消耗的時間大於並行執行帶來效率的提高。
這個時候在一個節點上順序運行這些任務反而能得到更好的效率。
這樣的job被稱爲uber task

簡單的說就是並行執行的時候任務效率的提高還不夠彌補你從新申請資源、建立容器、分發任務等消耗的時間。

那麼怎樣纔算small job呢?

默認狀況下:small job是有少於10個mapper,只有一個reducer,一個輸入大小小於一個HDFS Block大小的job。
固然也能夠經過參數 mapreduce.job.ubertask.maxmaps ,mapreduce.job.ubertask.maxreduces , mapreduce.job.ubertask.maxbytes 進行設置。
對於Ubertasks,mapreduce.job.ubertask.enable必須設置爲true

對於步驟9補充說明:

在這個過程當中,會先申請map任務的容器,
由於全部的map任務都必須在reduce的排序階段開始以前完成(Shuffle and Sort機制)。
reduce任務的請求直到5%的map任務完成纔會發出(reduce slow start機制)。
對於reduce任務,能夠在集羣的任何結點運行,
可是對map任務,會有數據本地性的要求(詳情此處不展開)
申請還爲任務指定內存和cpu。默認狀況下,每一個mapreduce任務分配1024 MB內存和1個虛擬核,
能夠經過mapreduce.map.memory.mb , mapreduce.reduce.memory.mb , mapreduce.map.cpu.vcores mapreduce.reduce.cpu.vcores進行配置


Yarn 與mapreduce 1

上面就是Mapreduce2的任務提交執行流程,一開始咱們就介紹了Mapreduce1,如今咱們對比下兩個有啥區別。

本質就是結合Mapreduce 2 對比YarnMapreduce1調度的區別,因此後面Mapreduce 2 直接用Yarn替換

Mapreduce 1 中,做業執行過程由兩類守護進程控制,分別爲一個jobtracker和多個tasktracker

jobtracker 經過調度tasktracker上的任務來協調運行在系統的Job,並記錄返回的任務進度。
tasktracker負責運行任務並向`jobtracker``發送任務進度。

jobtracker同時負責做業的調度(分配任務與tasktracker匹配)和任務進度監控(任務跟蹤、失敗重啓、記錄流水、維護進度、計數器等)

Yarn 中,也有兩類守護進程Resource ManagerNonde Manager分別類比jobtrackertasktracker

可是不同的地方在於,jobtracker的職責在Yarn中被拆分,由兩個實體Reource Manger 和``Application Master```(每一個Job有一個)。

jobtracker 也能夠存儲做業歷史,或者經過運行一個獨立守護進程做爲歷史做業服務器。而與對應的,Yarn提供一個時間軸服務器(timeline server)來存儲應用的歷史。

兩者組成類比

Mapreduce 1 Yarn
jobtracker Reource Manger
Application Master
timeline server
tasktracker Nonde Manager
Slot container

對於兩者的區別,心血來潮想了個例子,但願可以幫助理解。
有三個角色:皇帝、大臣、打工人

如今有兩個狀況,
1:發生水災,須要賑災
2:敵寇入侵,邊疆告急

在這種狀況向 Mapreduce 1 的作法是:

Yarn的作法:

簡單的說,就是Yarn讓專業的人作專業的事情。
遇到事情找個專家,我只負責提需求和提供資源,
其餘的讓專家去作。

這個專家就是MRAppMaster(Mapreduce),而對應的Spark也有本身的專家

由此也已總結下Yarn帶來的優點:

  1. 可拓展性(Scalability)
    Yarn能夠在比MapReduce 1更大的集羣上運行。
    MapReduce 1在4000個節點和40000個任務的時候達到拓展性的瓶頸。
    主要是由於jobtracker須要管理做業和任務。
    Yarn就拆分了這個,將做業與任務拆分,由Manager/Application Master分別負責,能夠輕鬆將拓展至10,000 個節點 100,000 個任務。
  2. 可用性(Availability)
    高可用性(HA)一般是經過複製另外一個守護進程所需的狀態來實現的,以便在活躍狀態的守護進程掛掉時接管提供服務所需的工做。
    可是,jobtracker的內存中有大量快速變化的複雜狀態(例如,每一個任務狀態每隔幾秒更新一次),這使得將在jobtracker服務配置HA很是困難。
    而對於Yarn而言,因爲職責被拆分,那麼HA也隨之變成了分治問題。
    能夠先提供Resource Manager的HA,同時若是有須要能夠爲每一個人應用也提供HA。

實際上對於Mapreduce 2Resource ManagerApplication Master都提供了HA,稍候介紹。

  1. 利用率(Utilization)
    MapReduce 1 中,每一個tasktracker都靜態配置若干個slot,在配置的時候被劃分爲map slotreduce slot,只能執行固定的任務。
    而在Yarn中,Node Manager管理一個資源池,只要有資源,任務就能夠運行。
    同時資源是精細化管理的,任務能夠按需申請資源。
  2. 多租戶(Multitenancy
    其實,某種程度上來講,統一的資源管理,向其餘分佈式應用開放Hadoop纔是Yarn的最大優點。
    好比咱們能夠部署Spark、Flink等等。此時MapReduce也僅僅是在這之上的一個應用罷了

High Availability

接下來在說一下HA吧。
這裏主要結合Mapreduce 2 來講明
HA 針對的都是出現錯誤或失敗的狀況。
在咱們這裏,出現錯誤或失敗的場景有如下幾個

  1. 任務失敗
  2. Application Master失敗
  3. Node Manager失敗
  4. Resource Manager失敗

接下來咱們分別看看這些狀況怎麼解決。


task 失敗

任務失敗的狀況有可能出現下面的狀況:

  1. 用戶map、reduce task代碼問題,這種失敗最多見,此時在task JVM在退出前會向Application Master發送錯誤報告,該報告會被計入用戶日誌。最後Application Master會將該任務將任務嘗試標記爲failed,並釋放容器,使其資源可供另外一個任務使用。
  2. 另外一種狀況是task JVM忽然退出,可能存在一個JVM bug,致使JVM在特定環境下退出MapReduce的用戶代碼。
    這種狀況下,Node Manager發現進程已經退出,會告知Application Master,並將任務嘗試標記爲failed
  3. 還有一種是任務超時或者掛起,一旦Application Master注意到有一段時間沒有收到任務進度更新了,就會將該任務標記爲failed,由參數mapreduce.task.timeout(默認10分鐘,0表示關閉超時,此時長時間運行任務永遠不會標記爲failed,慎用)設置。

task 失敗的處理方式:

  1. Application Master發現任務失敗後,會從新調度該任務,會進行避免在以前失敗的Node Manager上調度該任務。
    若是一個任務連續失敗四次(mapreduce.map.maxattempts,mapreduce.reduce.maxattempts),就不會繼續重調,整個Job也就失敗。
  2. 而有些場景在少數任務失敗,結果仍舊可使用,那麼此時咱們不但願中止任務,能夠配置一個容許任務失敗的閥值(百分比),此時不會觸發Job失敗。
    經過mapreduce.map.failures.maxpercentmapreduce.reduce.failures.maxpercent設置。
  3. 還有一個狀況是任務嘗試被kill,這種狀況Application Master制動標記killed不屬於任務失敗。

推測機制(Speculative Execution),若是發現task執行的時間運行速度明顯慢於平均水平,就會在其餘的結點啓動一個相同的任務,稱爲推測執行。
這個不必定有效,僅僅是投機性的嘗試。
當任務成功完成時,任何正在運行的重複任務都將被終止,由於再也不須要它們。
就是推測任務與原始任務誰能上位就看誰先完成了。


Application Master 失敗

當遇到Application Master失敗是,Yarn也會進行嘗試。
能夠經過配置mapreduce.am.max-attempts property(默認:2)配置重試次數,
同時,Yarn對於集羣中運行的Application Master最大嘗試次數加了限制,也須要經過 yarn.resourcemanager.am.max-attempts(默認:2)進行配置。

重試的流程以下:

Application MasterResource Manager發送心跳,若是Application Master發生故障,Resource Manager將檢測故障,並在新的容器中啓動運行Application Master的新實例

在MapReduce,它可使用做業歷史記錄來恢復(失敗的)應用程序已經運行的任務的狀態,這樣它們就沒必要從新運行。
默認狀況下恢復是啓用的,可是能夠經過設置yarn.app.mapreduce.am.job.recovery來禁用。


MapReduce client輪詢Application Master的進度報告,
但若是它的Application Master失敗,客戶端須要定位新的實例。

Job初始化期間,clientResource Manager請求Application Master的地址,而後對其進行緩存,這樣在每次須要輪詢Application Master時,
就不會向Resource Manager發出請求,從而使Resource Manager負擔太重。

可是,若是Application Master失敗,client將發出狀態更新時超時,此時client將向Resource Manager請求新的Application Master的地址。


Node Manager 失敗

若是Node Manager因崩潰或運行緩慢而發生故障,它將中止向Resource Manager發送心跳(或發送頻率很是低)。
Resource Manager 若是在10分鐘內(yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms )沒有收到Node Manager的心跳信息,
就會告訴該Node Manager,中止發送心跳,並將它從本身的Nodes池中移除。

在此Node Manager失敗的taskApplication Master 都回按照以前的說的方式恢復。

此外,即便map tasks在失敗的Node Manager上運行併成功完成但屬於未完成的job
Application Master也會安排它們從新運行,
由於它們的中間輸出駐留在故障Node Manager的本地文件系統上,reduce任務可能沒法訪問。

若是一個Node Manager失敗任務次數過多,該Node Manager會被Application Master拉入黑名單。

對於 MapReduce,若是一個Job在某個Node Manager失敗3個任務( mapreduce.job.maxtaskfailures.per.tracker),就會嘗試在其餘的結點進行調度。
注意,Resource Manager不會跨應用程序執行黑名單(編寫時),
所以來自新做業的任務可能會在壞節點上調度,即便它們已被運行較早做業的應用程序主程序列入黑名單。


Resource Manager 失敗

Resource Manager失敗是很嚴重的,一旦它失敗, jobtask容器都沒法啓動。
在默認配置中,Resource Manager是一個單故障點,由於在(不太可能的)機器故障的狀況下,全部正在運行的做業都失敗了而且沒法恢復。

要實現高可用性(HA),必須在一個active-standby配置中運行一對Resource Manager
若是 active Resource Manager發生故障,則standby Resource Manager能夠接管,而不會對client形成重大中斷。

經過將運行中的應用程序信息存儲在高可用的狀態存儲區中(經過ZooKeeper/HDFS備份),實現standby Resource Manager 恢復active Resource Manager(失敗)的關鍵狀態。

Node Manager 信息沒有存儲在狀態存儲中,由於當節Node Manager 第一次心跳時,新的Resource Manager能夠相對較快地對其進行重構。

由於taskApplication Master管理,因此task不屬於Resource Manager的狀態,所以於Resource Manager存儲的狀態比jobtracker中的狀態更容易管理。

目前,有兩種持久化RMStateStore的方式,分別爲:FileSystemRMStateStoreZKRMStateStore

總體架構以下:

咱們能夠經過手動或自動重啓ResourceManager

被提高爲active 狀態的ResourceManager加載ResourceManager內部狀態,並根據ResourceManager restart特性儘量從上一個active Resource Manager 離開的地方繼續操做。

對於之前提交給ResourceManager的每一個託管Application,都會產生一個新的嘗試。
應用程序能夠按期checkpoint,以免丟失任何工做。
狀態存儲必須在兩個Active/Standby Resource Managers中均可見。

從上圖能夠看到咱們能夠選擇的狀態存儲介質有兩個FileSystemRMStateStoreZKRMStateStore

ZKRMStateStore隱式地容許在任什麼時候間點對單ResourceManagers進行寫訪問,
所以在HA集羣中推薦使用ZKRMStateStore。

在使用ZKRMStateStore時,不須要單獨的防護機制來解決可能出現的腦裂狀況,即多個Resource Manager可能扮演active角色。

而且ResourceManager能夠選擇嵌入基於zookeeperActiveStandbyElector來決定哪一個Resource Manager應該是active的。
activeResourceManager關閉或失去響應時,另外一個Resource Manager會自動被選爲active,而後由它接管。

注意,不須要像HDFS那樣運行一個單獨的ZKFC守護進程,由於嵌入在Resource Manager中的ActiveStandbyElector充當故障檢測器和leader elector,因此不須要單獨的ZKFC守護進程。


關於Yarn的內容就介紹到這裏,更詳細的內容能夠參考官網

以後會更新一些Hdfs讀寫的源碼追蹤相關文章,有興趣能夠關注【兔八哥雜談】

相關文章
相關標籤/搜索