【大數據】Spark內核解析

 

1. Spark 內核概述

Spark內核泛指Spark的核心運行機制,包括Spark核心組件的運行機制、Spark任務調度機制、Spark內存管理機制、Spark核心功能的運行原理等,熟練掌握Spark內核原理,可以幫助咱們更好地完成Spark代碼設計,並可以幫助咱們準確鎖定項目運行過程當中出現的問題的癥結所在。html

1.1 Spark核心組件回顧

1.1.1 Driver

Spark驅動器節點,用於執行Spark任務中的main方法,負責實際代碼的執行工做。Driver在Spark做業執行時主要負責:node

  1. 將用戶程序轉化爲任務(job);
  2. Executor之間調度任務(task);
  3. 跟蹤Executor的執行狀況;
  4. 經過UI展現查詢運行狀況;

1.1.2 Executor

Spark Executor節點是一個JVM進程,負責在 Spark 做業中運行具體任務,任務彼此之間相互獨立。Spark 應用啓動時,Executor節點被同時啓動,而且始終伴隨着整個 Spark 應用的生命週期而存在。若是有Executor節點發生了故障或崩潰,Spark 應用也能夠繼續執行,會將出錯節點上的任務調度到其餘Executor節點上繼續運行。面試

Executor有兩個核心功能:算法

1. 負責運行組成Spark應用的任務,並將結果返回給Driver進程;編程

2. 它們經過自身的塊管理器(Block Manager)爲用戶程序中要求緩存的 RDD 提供內存式存儲。RDD 是直接緩存在Executor進程內的,所以任務能夠在運行時充分利用緩存數據加速運算。json

1.2 Spark通用運行流程概述

 

1-1 Spark核心運行流程api

1-1Spark通用運行流程,不論Spark以何種模式進行部署,任務提交後,都會先啓動Driver進程,隨後Driver進程向集羣管理器註冊應用程序,以後集羣管理器根據此任務的配置文件分配Executor並啓動,當Driver所需的資源所有知足後,Driver開始執行main函數,Spark查詢爲懶執行,當執行到action算子時開始反向推算,根據寬依賴進行stage的劃分,隨後每個stage對應一個taskset,taskset中有多個task,根據本地化原則,task會被分發到指定的Executor去執行,在任務執行的過程當中,Executor也會不斷與Driver進行通訊,報告任務運行狀況。數組

2. Spark 部署模式

Spark支持3種集羣管理器(Cluster Manager),分別爲:緩存

  1. Standalone:獨立模式,Spark原生的簡單集羣管理器,自帶完整的服務,可單獨部署到一個集羣中,無需依賴任何其餘資源管理系統,使用Standalone能夠很方便地搭建一個集羣;
  2. Apache Mesos:一個強大的分佈式資源管理框架,它容許多種不一樣的框架部署在其上,包括yarn;
  3. Hadoop YARN:統一的資源管理機制,在上面能夠運行多套計算框架,如map reduce、storm等,根據driver在集羣中的位置不一樣,分爲yarn client和yarn cluster。

實際上,除了上述這些通用的集羣管理器外,Spark內部也提供了一些方便用戶測試和學習的簡單集羣部署模式。因爲在實際工廠環境下使用的絕大多數的集羣管理器是Hadoop YARN,所以咱們關注的重點是Hadoop YARN模式下的Spark集羣部署。服務器

Spark的運行模式取決於傳遞給SparkContext的MASTER環境變量的值,個別模式還須要輔助的程序接口來配合使用,目前支持的Master字符串及URL包括:

2-1 Spark運行模式配置

Master URL

Meaning

local

在本地運行,只有一個工做進程,無並行計算能力。

local[K]

在本地運行,有K個工做進程,一般設置K爲機器的CPU核心數量。

local[*]

在本地運行,工做進程數量等於機器的CPU核心數量。

spark://HOST:PORT

以Standalone模式運行,這是Spark自身提供的集羣運行模式,默認端口號: 7077。詳細文檔見:Spark standalone cluster。

mesos://HOST:PORT

在Mesos集羣上運行,Driver進程和Worker進程運行在Mesos集羣上,部署模式必須使用固定值:--deploy-mode cluster。詳細文檔見:MesosClusterDispatcher.

yarn-client

在Yarn集羣上運行,Driver進程在本地,Work進程在Yarn集羣上,部署模式必須使用固定值:--deploy-mode client。Yarn集羣地址必須在HADOOP_CONF_DIR or YARN_CONF_DIR變量裏定義。

yarn-cluster

在Yarn集羣上運行,Driver進程在Yarn集羣上,Work進程也在Yarn集羣上,部署模式必須使用固定值:--deploy-mode cluster。Yarn集羣地址必須在HADOOP_CONF_DIR or YARN_CONF_DIR變量裏定義。

用戶在提交任務給Spark處理時,如下兩個參數共同決定了Spark的運行方式。

· –master MASTER_URL :決定了Spark任務提交給哪一種集羣處理。

· –deploy-mode DEPLOY_MODE:決定了Driver的運行方式,可選值爲Client或者Cluster。

2.1 Standalone模式運行機制

Standalone集羣有四個重要組成部分,分別是:

1) Driver:是一個進程,咱們編寫的Spark應用程序就運行在Driver上,由Driver進程執行;

2) Master:是一個進程,主要負責資源的調度和分配,並進行集羣的監控等職責;

3) Worker:是一個進程,一個Worker運行在集羣中的一臺服務器上,主要負責兩個職責,一個是用本身的內存存儲RDD的某個或某些partition;另外一個是啓動其餘進程和線程(Executor),對RDD上的partition進行並行的處理和計算。

4) Executor:是一個進程,一個Worker上能夠運行多個Executor,Executor經過啓動多個線程(task)來執行對RDD的partition進行並行計算,也就是執行咱們對RDD定義的例如map、flatMap、reduce等算子操做。

2.1.1 Standalone Client模式

 

2-1 Standalone Client模式

Standalone Client模式下,Driver在任務提交的本地機器上運行,Driver啓動後向Master註冊應用程序,Master根據submit腳本的資源需求找到內部資源至少能夠啓動一個Executor的全部Worker,而後在這些Worker之間分配Executor,Worker上的Executor啓動後會向Driver反向註冊,全部的Executor註冊完成後,Driver開始執行main函數,以後執行到Action算子時,開始劃分stage,每一個stage生成對應的taskSet,以後將task分發到各個Executor上執行。

2.1.2 Standalone Cluster模式

 

2-2 Standalone Cluster模式

Standalone Cluster模式下,任務提交後,Master會找到一個Worker啓動Driver進程, Driver啓動後向Master註冊應用程序,Master根據submit腳本的資源需求找到內部資源至少能夠啓動一個Executor的全部Worker,而後在這些Worker之間分配Executor,Worker上的Executor啓動後會向Driver反向註冊,全部的Executor註冊完成後,Driver開始執行main函數,以後執行到Action算子時,開始劃分stage,每一個stage生成對應的taskSet,以後將task分發到各個Executor上執行。

注意,Standalone的兩種模式下(client/Cluster),Master在接到Driver註冊Spark應用程序的請求後,會獲取其所管理的剩餘資源可以啓動一個Executor的全部Worker,而後在這些Worker之間分發Executor,此時的分發只考慮Worker上的資源是否足夠使用,直到當前應用程序所需的全部Executor都分配完畢,Executor反向註冊完畢後,Driver開始執行main程序。

2.2 YARN模式運行機制

2.2.1 YARN Client模式

 

2-3 YARN Client模式

YARN Client模式下,Driver在任務提交的本地機器上運行,Driver啓動後會和ResourceManager通信申請啓動ApplicationMaster,隨後ResourceManager分配container,在合適的NodeManager上啓動ApplicationMaster,此時的ApplicationMaster的功能至關於一個ExecutorLaucher,只負責向ResourceManager申請Executor內存。

ResourceManager接到ApplicationMaster的資源申請後會分配container,而後ApplicationMaster在資源分配指定的NodeManager上啓動Executor進程,Executor進程啓動後會向Driver反向註冊,Executor所有註冊完成後Driver開始執行main函數,以後執行到Action算子時,觸發一個job,並根據寬依賴開始劃分stage,每一個stage生成對應的taskSet,以後將task分發到各個Executor上執行。

2.2.2 YARN Cluster模式

 

2-4 YARN Cluster模式

YARN Cluster模式下,任務提交後會和ResourceManager通信申請啓動ApplicationMaster,隨後ResourceManager分配container,在合適的NodeManager上啓動ApplicationMaster,此時的ApplicationMaster就是Driver。

Driver啓動後向ResourceManager申請Executor內存,ResourceManager接到ApplicationMaster的資源申請後會分配container,而後在合適的NodeManager上啓動Executor進程,Executor進程啓動後會向Driver反向註冊,Executor所有註冊完成後Driver開始執行main函數,以後執行到Action算子時,觸發一個job,並根據寬依賴開始劃分stage,每一個stage生成對應的taskSet,以後將task分發到各個Executor上執行。

3. Spark 通信架構

3.1 Spark通訊架構概述

Spark2.x版本使用Netty通信框架做爲內部通信組件。spark 基於Netty新的rpc框架借鑑了Akka中的設計,它是基於Actor模型,以下圖所示:

 

4-1 Actor模型

Spark通信框架中各個組件(Client/Master/Worker)能夠認爲是一個個獨立的實體,各個實體之間經過消息來進行通訊。具體各個組件之間的關係圖以下:

 

4-2 Spark通信架構

Endpoint(Client/Master/Worker)有1個InBox和N個OutBoxN>=1N取決於當前Endpoint與多少其餘的Endpoint進行通訊,一個與其通信的其餘Endpoint對應一個OutBox),Endpoint接收到的消息被寫入InBox,發送出去的消息寫入OutBox並被髮送到其餘Endpoint的InBox中。

3.2 Spark通信架構解析

Spark通訊架構以下圖所示:

 

4-3 Spark通信架構

1) RpcEndpoint:RPC端點,Spark針對每一個節點(Client/Master/Worker)都稱之爲一個Rpc端點,且都實現RpcEndpoint接口,內部根據不一樣端點的需求,設計不一樣的消息和不一樣的業務處理,若是須要發送(詢問)則調用Dispatcher;

2) RpcEnv:RPC上下文環境,每一個RPC端點運行時依賴的上下文環境稱爲RpcEnv;

3) Dispatcher:消息分發器,針對於RPC端點須要發送消息或者從遠程RPC接收到的消息,分發至對應的指令收件箱/發件箱。若是指令接收方是本身則存入收件箱,若是指令接收方不是本身,則放入發件箱;

4) Inbox:指令消息收件箱,一個本地RpcEndpoint對應一個收件箱,Dispatcher在每次向Inbox存入消息時,都將對應EndpointData加入內部ReceiverQueue中,另外Dispatcher建立時會啓動一個單獨線程進行輪詢ReceiverQueue,進行收件箱消息消費;

5) RpcEndpointRef:RpcEndpointRef是對遠程RpcEndpoint的一個引用。當咱們須要向一個具體的RpcEndpoint發送消息時,通常咱們須要獲取到該RpcEndpoint的引用,而後經過該應用發送消息。

6) OutBox:指令消息發件箱,對於當前RpcEndpoint來講,一個目標RpcEndpoint對應一個發件箱,若是向多個目標RpcEndpoint發送信息,則有多個OutBox。當消息放入Outbox後,緊接着經過TransportClient將消息發送出去。消息放入發件箱以及發送過程是在同一個線程中進行;

7) RpcAddress:表示遠程的RpcEndpointRef的地址,Host + Port。

8) TransportClient:Netty通訊客戶端,一個OutBox對應一個TransportClient,TransportClient不斷輪詢OutBox,根據OutBox消息的receiver信息,請求對應的遠程TransportServer;

9) TransportServer:Netty通訊服務端,一個RpcEndpoint對應一個TransportServer,接受遠程消息後調用Dispatcher分發消息至對應收發件箱;

根據上面的分析,Spark通訊架構的高層視圖以下圖所示:

 

4-4 Spark通訊框架高層視圖

4. SparkContext解析

Spark中由SparkContext負責與集羣進行通信、資源的申請以及任務的分配和監控等。當Worker節點中的Executor運行完畢Task後,Driver同時負責將SparkContext關閉。一般也可使用SparkContext來表明驅動程序(Driver)。

 

4-1 SparkContext與集羣交互

SparkContext是用戶通往Spark集羣的惟一入口,能夠用來在Spark集羣中建立RDD、累加器和廣播變量。SparkContext也是整個Spark應用程序中相當重要的一個對象,能夠說是整個Application運行調度的核心(不包括資源調度)。

SparkContext的核心做用是初始化Spark應用程序運行所需的核心組件,包括高層調度器(DAGScheduler)、底層調度器(TaskScheduler)和調度器的通訊終端(SchedulerBackend),同時還會負責Spark程序向Cluster Manager的註冊等。

 

4-2 SparkContext初始化組件

在實際的編碼過程當中,咱們會先建立SparkConf實例,並對SparkConf的屬性進行自定義設置,隨後,將SparkConf做爲SparkContext類的惟一構造參數傳入來完成SparkContext實例對象的建立。

SparkContext在實例化的過程當中會初始化DAGScheduler、TaskScheduler和SchedulerBackend,當RDDaction算子觸發了做業(Job)後,SparkContext會調用DAGScheduler根據寬窄依賴將Job劃分紅幾個小的階段(Stage),TaskScheduler會調度每一個Stage的任務(Task),另外,SchedulerBackend負責申請和管理集羣爲當前Application分配的計算資源(即Executor)。

若是咱們將Spark Application比做汽車,那麼SparkContext就是汽車的引擎,而SparkConf就是引擎的配置參數。

下圖描述了Spark-On-Yarn模式下在任務調度期間,ApplicationMaster、Driver以及Executor內部模塊的交互過程:

 

圖4-2 Spark組件交互過程

Driver初始化SparkContext過程當中,會分別初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,並啓動SchedulerBackend以及HeartbeatReceiver。SchedulerBackend經過ApplicationMaster申請資源,並不斷從TaskScheduler中拿到合適的Task分發到Executor執行HeartbeatReceiver負責接收Executor的心跳信息,監控Executor的存活情況,並通知到TaskScheduler。

5. Spark 任務調度機制

在工廠環境下,Spark集羣的部署方式通常爲YARN-Cluster模式,以後的內核分析內容中咱們默認集羣的部署方式爲YARN-Cluster模式。

5.1 Spark任務提交流程

在上一章中咱們講解了Spark YARN-Cluster模式下的任務提交流程,以下圖所示:

 

圖5-1 YARN-Cluster任務提交流程

下面的時序圖清晰地說明了一個Spark應用程序從提交到運行的完整流程:

 

圖5-2 Spark任務提交時序圖

提交一個Spark應用程序,首先經過Client向ResourceManager請求啓動一個Application,同時檢查是否有足夠的資源知足Application的需求,若是資源條件知足,則準備ApplicationMaster的啓動上下文,交給ResourceManager,並循環監控Application狀態。

當提交的資源隊列中有資源時,ResourceManager會在某個NodeManager上啓動ApplicationMaster進程,ApplicationMaster會單獨啓動Driver後臺線程,當Driver啓動後,ApplicationMaster會經過本地的RPC鏈接Driver,並開始向ResourceManager申請Container資源運行Executor進程(一個Executor對應與一個Container),當ResourceManager返回Container資源,ApplicationMaster則在對應的Container上啓動Executor。

Driver線程主要是初始化SparkContext對象,準備運行所需的上下文,而後一方面保持與ApplicationMaster的RPC鏈接,經過ApplicationMaster申請資源,另外一方面根據用戶業務邏輯開始調度任務,將任務下發到已有的空閒Executor上。

ResourceManager向ApplicationMaster返回Container資源時,ApplicationMaster就嘗試在對應的Container上啓動Executor進程,Executor進程起來後,會向Driver反向註冊,註冊成功後保持與Driver的心跳,同時等待Driver分發任務,當分發的任務執行完畢後,將任務狀態上報給Driver。

從上述時序圖可知,Client只負責提交Application並監控Application的狀態。對於Spark的任務調度主要是集中在兩個方面: 資源申請和任務分發,其主要是經過ApplicationMaster、Driver以及Executor之間來完成。

5.2 Spark任務調度概述

Driver起來後,Driver則會根據用戶程序邏輯準備任務,並根據Executor資源狀況逐步分發任務。在詳細闡述任務調度前,首先說明下Spark裏的幾個概念。一個Spark應用程序包括Job、Stage以及Task三個概念:

l Job是以Action方法爲界,遇到一個Action方法則觸發一個Job;

l Stage是Job的子集,以RDD寬依賴(即Shuffle)爲界,遇到Shuffle作一次劃分;

l Task是Stage的子集,以並行度(分區數)來衡量,分區數是多少,則有多少個task。

Spark的任務調度整體來講分兩路進行,一路是Stage級的調度,一路是Task級的調度,整體調度流程以下圖所示:

 

圖5-3 Spark任務調度概覽

Spark RDD經過其Transactions操做,造成了RDD血緣關係圖,即DAG,最後經過Action的調用,觸發Job並調度執行。DAGScheduler負責Stage級的調度,主要是將job切分紅若干Stages,並將每一個Stage打包成TaskSet交給TaskScheduler調度。TaskScheduler負責Task級的調度,將DAGScheduler給過來的TaskSet按照指定的調度策略分發到Executor上執行,調度過程當中SchedulerBackend負責提供可用資源,其中SchedulerBackend有多種實現,分別對接不一樣的資源管理系統。

5.3 Spark Stage級調度

Spark的任務調度是從DAG切割開始,主要是由DAGScheduler來完成。當遇到一個Action操做後就會觸發一個Job的計算,並交給DAGScheduler來提交,下圖是涉及到Job提交的相關方法調用流程圖。

 

圖5-5 Job提交調用棧

Job由最終的RDD和Action方法封裝而成,SparkContext將Job交給DAGScheduler提交,它會根據RDD的血緣關係構成的DAG進行切分,將一個Job劃分爲若干Stages,具體劃分策略是,由最終的RDD不斷經過依賴回溯判斷父依賴是不是寬依賴,即以Shuffle爲界,劃分Stage,窄依賴的RDD之間被劃分到同一個Stage中,能夠進行pipeline式的計算,如上圖紫色流程部分。劃分的Stages分兩類,一類叫作ResultStage,爲DAG最下游的Stage,由Action方法決定,另外一類叫作ShuffleMapStage,爲下游Stage準備數據,下面看一個簡單的例子WordCount。

 

圖5-6 WordCount實例

Job由saveAsTextFile觸發,該Job由RDD-3和saveAsTextFile方法組成,根據RDD之間的依賴關係從RDD-3開始回溯搜索,直到沒有依賴的RDD-0,在回溯搜索過程當中,RDD-3依賴RDD-2,而且是寬依賴,因此在RDD-2和RDD-3之間劃分Stage,RDD-3被劃到最後一個Stage,即ResultStage中,RDD-2依賴RDD-1,RDD-1依賴RDD-0,這些依賴都是窄依賴,因此將RDD-0、RDD-1和RDD-2劃分到同一個Stage,即ShuffleMapStage中,實際執行的時候,數據記錄會一鼓作氣地執行RDD-0到RDD-2的轉化。不難看出,其本質上是一個深度優先搜索算法。

一個Stage是否被提交,須要判斷它的父Stage是否執行,只有在父Stage執行完畢才能提交當前Stage,若是一個Stage沒有父Stage,那麼從該Stage開始提交Stage提交時會將Task信息(分區信息以及方法等)序列化並被打包成TaskSet交給TaskScheduler,一個Partition對應一個Task,另外一方面TaskScheduler會監控Stage的運行狀態,只有Executor丟失或者Task因爲Fetch失敗才須要從新提交失敗的Stage以調度運行失敗的任務,其餘類型的Task失敗會在TaskScheduler的調度過程當中重試。

相對來講DAGScheduler作的事情較爲簡單,僅僅是在Stage層面上劃分DAG,提交Stage並監控相關狀態信息。TaskScheduler則相對較爲複雜,下面詳細闡述其細節。

5.4 Spark Task級調度

Spark Task的調度是由TaskScheduler來完成,由前文可知,DAGScheduler將Stage打包到TaskSet交給TaskScheduler,TaskScheduler會將TaskSet封裝爲TaskSetManager加入到調度隊列中TaskSetManager結構以下圖所示。

 

圖5-7 TaskManager結構

TaskSetManager負責監控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager爲單元來調度任務。

TaskScheduler初始化後會啓動SchedulerBackend,它負責跟外界打交道,接收Executor的註冊信息,並維護Executor的狀態,因此說SchedulerBackend是管「糧食」的,同時它在啓動後會按期地去「詢問」TaskScheduler有沒有任務要運行,也就是說,它會按期地「問」TaskScheduler「我有這麼餘量,你要不要啊」,TaskScheduler在SchedulerBackend「問」它的時候,會從調度隊列中按照指定的調度策略選擇TaskSetManager去調度運行,大體方法調用流程以下圖所示:

 

圖5-9 task調度流程

3-7中,將TaskSetManager加入rootPool調度池中以後,調用SchedulerBackend的riviveOffers方法給driverEndpoint發送ReviveOffer消息;driverEndpoint收到ReviveOffer消息後調用makeOffers方法,過濾出活躍狀態的Executor(這些Executor都是任務啓動時反向註冊到Driver的Executor),而後將Executor封裝成WorkerOffer對象;準備好計算資源(WorkerOffer)後,taskScheduler基於這些資源調用resourceOfferExecutor上分配task。

5.4.1 調度策略

前面講到,TaskScheduler會先把DAGScheduler給過來的TaskSet封裝成TaskSetManager扔到任務隊列裏,而後再從任務隊列裏按照必定的規則把它們取出來在SchedulerBackend給過來的Executor上運行。這個調度過程實際上仍是比較粗粒度的,是面向TaskSetManager的。

TaskScheduler是以樹的方式來管理任務隊列,樹中的節點類型爲Schdulable,葉子節點爲TaskSetManager,非葉子節點爲Pool,下圖是它們之間的繼承關係。

 

圖5-10 任務隊列繼承關係

調度隊列的層次結構以下圖所示:

 

圖5-11 FIFO調度策略內存結構

TaskScheduler支持兩種調度策略,一種是FIFO,也是默認的調度策略,另外一種是FAIR。在TaskScheduler初始化過程當中會實例化rootPool,表示樹的根節點,是Pool類型。

  1. FIFO調度策略

FIFO調度策略執行步驟以下:

1) s1s2兩個Schedulable的優先級(Schedulable類的一個屬性,記爲priority,值越小,優先級越高);

2) 若是兩個Schedulable的優先級相同,則對s1s2所屬的Stage的身份進行標識進行比較(Schedulable類的一個屬性,記爲priority,值越小,優先級越高);

3) 若是比較的結果小於0,則優先調度s1,不然優先調度s2。

 

圖5-12 FIFO調度策略內存結構

  1. FAIR調度策略

FAIR調度策略的樹結構以下圖所示:

 

圖5-13 FAIR調度策略內存結構

FAIR模式中有一個rootPool和多個子Pool,各個子Pool中存儲着全部待分配的TaskSetMagager。

能夠經過在Properties中指定spark.scheduler.pool屬性,指定調度池中的某個調度池做爲TaskSetManager的父調度池,若是根調度池不存在此屬性值對應的調度池,會建立以此屬性值爲名稱的調度池做爲TaskSetManager的父調度池,並將此調度池做爲根調度池的子調度池。

FAIR模式中須要先對子Pool進行排序,再對子Pool裏面的TaskSetMagager進行排序,由於Pool和TaskSetMagager都繼承了Schedulable特質,所以使用相同的排序算法

排序過程的比較是基於Fair-share來比較的,每一個要排序的對象包含三個屬性: runningTasks值(正在運行的Task數)、minShare值、weight值,比較時會綜合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平調度配置文件fairscheduler.xml中被指定,調度池在構建階段會讀取此文件的相關配置。

1) 若是A對象的runningTasks大於它的minShare,B對象的runningTasks小於它的minShare,那麼B排在A前面;(runningTasks比minShare小的先執行

2) 若是A、B對象的runningTasks都小於它們的minShare,那麼就比較runningTasks與minShare的比值(minShare使用率),誰小誰排前面;(minShare使用率低的先執行

3) 若是A、B對象的runningTasks都大於它們的minShare,那麼就比較runningTasks與weight的比值(權重使用率),誰小誰排前面。(權重使用率低的先執行

4) 若是上述比較均相等,則比較名字。

總體上來講就是經過minShare和weight這兩個參數控制比較過程,能夠作到讓minShare使用率和權重使用率少(實際運行task比例較少)的先運行

FAIR模式排序完成後,全部的TaskSetManager被放入一個ArrayBuffer裏,以後依次被取出併發送給Executor執行。

從調度隊列中拿到TaskSetManager後,因爲TaskSetManager封裝了一個Stage的全部Task,並負責管理調度這些Task,那麼接下來的工做就是TaskSetManager按照必定的規則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。

5.4.2 本地化調度

DAGScheduler切割Job,劃分Stage, 經過調用submitStage來提交一個Stage對應的tasks,submitStage會調用submitMissingTasks,submitMissingTasks 肯定每一個須要計算的 task 的preferredLocations,經過調用getPreferrdeLocations()獲得partition 的優先位置,因爲一個partition對應一個task,此partition的優先位置就是task的優先位置,對於要提交到TaskScheduler的TaskSet中的每個task,該task優先位置與其對應的partition對應的優先位置一致。

從調度隊列中拿到TaskSetManager後,那麼接下來的工做就是TaskSetManager按照必定的規則一個個取出taskTaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。前面也提到,TaskSetManager封裝了一個Stage的全部task,並負責管理調度這些task。

根據每一個task的優先位置,肯定task的Locality級別,Locality一共有五種,優先級由高到低順序:

表5-1 Spark本地化等級

名稱

解析

PROCESS_LOCAL

進程本地化,task和數據在同一個Executor中,性能最好。

NODE_LOCAL

節點本地化,task和數據在同一個節點中,可是task和數據不在同一個Executor中,數據須要在進程間進行傳輸。

RACK_LOCAL

機架本地化,task和數據在同一個機架的兩個節點上,數據須要經過網絡在節點之間進行傳輸。

NO_PREF

對於task來講,從哪裏獲取都同樣,沒有好壞之分。

ANY

task和數據能夠在集羣的任何地方,並且不在一個機架中,性能最差。

在調度執行時,Spark調度老是會盡可能讓每一個task以最高的本地性級別來啓動,當一個task以X本地性級別啓動,可是該本地性級別對應的全部節點都沒有空閒資源而啓動失敗,此時並不會立刻下降本地性級別啓動而是在某個時間長度內再次以X本地性級別來啓動該task,若超過限時時間則降級啓動,去嘗試下一個本地性級別,依次類推。

能夠經過調大每一個類別的最大容忍延遲時間,在等待階段對應的Executor可能就會有相應的資源去執行此task,這就在在必定程度上提到了運行性能。

5.4.3 失敗重試與黑名單機制

除了選擇合適的Task調度運行外,還須要監控Task的執行狀態,前面也提到,與外部打交道的是SchedulerBackend,Task被提交到Executor啓動執行後,Executor會將執行狀態上報給SchedulerBackend,SchedulerBackend則告訴TaskScheduler,TaskScheduler找到該Task對應的TaskSetManager,並通知到該TaskSetManager,這樣TaskSetManager就知道Task的失敗與成功狀態,對於失敗的Task,會記錄它失敗的次數,若是失敗次數尚未超過最大重試次數,那麼就把它放回待調度的Task池子中,不然整個Application失敗。

在記錄Task失敗次數過程當中,會記錄它上一次失敗所在的Executor Id和Host,這樣下次再調度這個Task時,會使用黑名單機制,避免它被調度到上一次失敗的節點上,起到必定的容錯做用。黑名單記錄Task上一次失敗所在的Executor Id和Host,以及其對應的「拉黑」時間,「拉黑」時間是指這段時間內不要再往這個節點上調度這個Task了。

6. Spark Shuffle解析

6.1 Shuffle的核心要點

6.1.1 ShuffleMapStage與FinalStage

 

圖6-1 ShuffleMapStage與FinalStage

在劃分stage時,最後一個stage稱爲FinalStage,它本質上是一個ResultStage對象,前面的全部stage被稱爲ShuffleMapStage。

ShuffleMapStage的結束伴隨着shuffle文件的寫磁盤。

ResultStage基本上對應代碼中的action算子,即將一個函數應用在RDD的各個partition的數據集上,意味着一個job的運行結束。

6.1.2 Shuffle中的任務個數

1. maptask個數的肯定

Shuffle過程當中的task個數由RDD分區數決定,而RDD的分區個數與參數spark.default.parallelism有密切關係。

Yarn Cluster模式下,若是沒有手動設置spark.default.parallelism ,則有:

Others: total number of cores on all executor nodes or 2, whichever is larger.

spark.default.parallelism =  max(全部executor使用的core總數, 2)

若是進行了手動配置,則:

spark.default.parallelism = 配置值

還有一個重要的配置:

The maximum number of bytes to pack into a single partition when reading files.

spark.files.maxPartitionBytes = 128 M (默認)

表明着rdd的一個分區能存放數據的最大字節數,若是一個400MB的文件,只分了兩個區,則在action時會發生錯誤。

當一個spark應用程序執行時,生成sparkContext,同時會生成兩個參數,由上面獲得的spark.default.parallelism推導出這兩個參數的值:

sc.defaultParallelism = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

當以上參數肯定後,就能夠推算RDD分區數目了。

 經過scala 集合方式parallelize生成的RDD

val rdd = sc.parallelize(1 to 10)

這種方式下,若是在parallelize操做時沒有指定分區數,則有:

rdd的分區數 = sc.defaultParallelism

 在本地文件系統經過textFile方式生成的RDD

val rdd = sc.textFile(「path/file」)

rdd的分區數 = max(本地file的分片數, sc.defaultMinPartitions)

 HDFS文件系統生成的RDD

rdd的分區數 = max(HDFS文件的Block數目, sc.defaultMinPartitions)

 HBase數據表獲取數據並轉換爲RDD

rdd的分區數 = Table的region個數

 經過獲取json(或者parquet等等)文件轉換成的DataFrame

rdd的分區數 = 該文件在文件系統中存放的Block數目

 Spark Streaming獲取Kafka消息對應的分區數

基於Receiver:

Receiver的方式中,Spark中的partition和kafka中的partition並非相關的,因此若是咱們加大每一個topic的partition數量,

僅僅是增長線程來處理由單一Receiver消費的主題。可是這並無增長Spark在處理數據上的並行度。

基於DirectDStream:

Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據,因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。

2. reducetask個數的肯定

Reduce端進行數據的聚合,一部分聚合算子能夠手動指定reducetask的並行度,若是沒有指定,則以map端的最後一個RDD的分區數做爲其分區數,那麼分區數就決定了reduce端的task的個數。

6.1.3 reduce端數據的讀取

根據stage的劃分咱們知道,map端task和reduce端task不在相同的stage中,map task位於ShuffleMapStage,reduce task位於ResultStage,map task會先執行,那麼後執行的reduce task如何知道從哪裏去拉取map task落盤後的數據呢?

reduce端的數據拉取過程以下:

  1. map task 執行完畢後會將計算狀態以及磁盤小文件位置等信息封裝到mapStatue對象中,而後由本進程中的MapOutPutTrackerWorker對象將mapStatus對象發送給Driver進程的MapOutPutTrackerMaster對象;
  2. reduce task開始執行以前會先讓本進程中的MapOutputTrackerWorker向Driver進程中的MapoutPutTrakcerMaster發動請求,請求磁盤小文件位置信息;
  3. 當全部的Map task執行完畢後,Driver進程中的MapOutPutTrackerMaster就掌握了全部的磁盤小文件的位置信息。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁盤小文件的位置信息;
  4. 完成以前的操做以後,由BlockTransforService去Executor所在的節點拉數據,默認會啓動五個子線程。每次拉取的數據量不能超過48M(reduce task每次最多拉取48M數據,將拉來的數據存儲到Executor內存的20%內存中)。

6.2 HashShuffle解析

如下的討論都假設每一個Executor有1個CPU core。

1. 未經優化的HashShuffleManager

shuffle write階段,主要就是在一個stage結束計算以後,爲了下一個stage能夠執行shuffle類的算子(好比reduceByKey),而將每一個task處理的數據按key進行「劃分」。所謂「劃分」,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤以前,會先將數據寫入內存緩衝中,當內存緩衝填滿以後,纔會溢寫到磁盤文件中去。

下一個stage的task有多少個,當前stage的每一個task就要建立多少份磁盤文件。好比下一個stage總共有100個task,那麼當前stage的每一個task都要建立100份磁盤文件。若是當前stage有50個task,總共有10個Executor,每一個Executor執行5個task,那麼每一個Executor上總共就要建立500個磁盤文件,全部Executor上會建立5000個磁盤文件。因而可知,未經優化的shuffle write操做所產生的磁盤文件的數量是極其驚人的。

shuffle read階段,一般就是一個stage剛開始時要作的事情。此時該stage的每個task就須要將上一個stage的計算結果中的全部相同key,從各個節點上經過網絡都拉取到本身所在的節點上,而後進行key的聚合或鏈接等操做。因爲shuffle write的過程當中,map task給下游stage的每一個reduce task都建立了一個磁盤文件,所以shuffle read的過程當中,每一個reduce task只要從上游stage的全部map task所在節點上,拉取屬於本身的那一個磁盤文件便可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每一個shuffle read task都會有一個本身的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,而後經過內存中的一個Map進行聚合等操做。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操做。以此類推,直到最後將全部數據到拉取完,並獲得最終的結果。

未優化的HashShuffleManager工做原理如圖1-7所示:

 

圖6-2 未優化的HashShuffleManager工做原理

2. 優化後的HashShuffleManager

爲了優化HashShuffleManager咱們能夠設置一個參數,spark.shuffle. consolidateFiles,該參數默認值爲false,將其設置爲true便可開啓優化機制,一般來講,若是咱們使用HashShuffleManager,那麼都建議開啓這個選項。

開啓consolidate機制以後,在shuffle write過程當中,task就不是爲下游stage的每一個task建立一個磁盤文件了,此時會出現shuffleFileGroup的概念,每一個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就能夠並行執行多少個task。而第一批並行執行的每一個task都會建立一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。

Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會複用以前已有的shuffleFileGroup,包括其中的磁盤文件,也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。所以,consolidate機制容許不一樣的task複用同一批磁盤文件,這樣就能夠有效將多個task的磁盤文件進行必定程度上的合併,從而大幅度減小磁盤文件的數量,進而提高shuffle write的性能。

假設第二個stage有100個task,第一個stage有50個task,總共仍是有10個Executor(Executor CPU個數爲1),每一個Executor執行5個task。那麼本來使用未經優化的HashShuffleManager時,每一個Executor會產生500個磁盤文件,全部Executor會產生5000個磁盤文件的。可是此時通過優化以後,每一個Executor建立的磁盤文件的數量的計算公式爲:CPU core的數量 * 下一個stage的task數量,也就是說,每一個Executor此時只會建立100個磁盤文件,全部Executor只會建立1000個磁盤文件。

優化後的HashShuffleManager工做原理如圖1-8所示:

 

圖6-3 優化後的HashShuffleManager工做原理d

6.3 SortShuffle解析

SortShuffleManager的運行機制主要分紅兩種,一種是普通運行機制,另外一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort. bypassMergeThreshold參數的值時(默認爲200),就會啓用bypass機制。

  1. 普通運行機制

在該模式下,數據會先寫入一個內存數據結構中,此時根據不一樣的shuffle算子,可能選用不一樣的數據結構。若是是reduceByKey這種聚合類的shuffle算子,那麼會選用Map數據結構,一邊經過Map進行聚合,一邊寫入內存;若是是join這種普通的shuffle算子,那麼會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構以後,就會判斷一下,是否達到了某個臨界閾值。若是達到臨界閾值的話,那麼就會嘗試將內存數據結構中的數據溢寫到磁盤,而後清空內存數據結構。

在溢寫到磁盤文件以前,會先根據key對內存數據結構中已有的數據進行排序。排序事後,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是經過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩衝輸出流,首先會將數據緩衝在內存中,當內存緩衝滿溢以後再一次寫入磁盤文件中,這樣能夠減小磁盤IO次數,提高性能。

一個task將全部數據寫入內存數據結構的過程當中,會發生屢次磁盤溢寫操做,也就會產生多個臨時文件。最後會將以前全部的臨時磁盤文件都進行合併,這就是merge過程,此時會將以前全部臨時磁盤文件中的數據讀取出來,而後依次寫入最終的磁盤文件之中。此外,因爲一個task就只對應一個磁盤文件,也就意味着該task爲下游stage的task準備的數據都在這一個文件中,所以還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。

SortShuffleManager因爲有一個磁盤文件merge的過程,所以大大減小了文件數量。好比第一個stage有50個task,總共有10個Executor,每一個Executor執行5個task,而第二個stage有100個task。因爲每一個task最終只有一個磁盤文件,所以此時每一個Executor上只有5個磁盤文件,全部Executor只有50個磁盤文件。

普通運行機制的SortShuffleManager工做原理如圖1-9所示:

 

圖6-4 普通運行機制的SortShuffleManager工做原理

  1. bypass運行機制

bypass運行機制的觸發條件以下:

l shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。

不是聚合類的shuffle算子。

此時,每一個task會爲每一個下游task都建立一個臨時磁盤文件,並將數據按key進行hash而後根據key的hash值,將key寫入對應的磁盤文件之中。固然,寫入磁盤文件時也是先寫入內存緩衝,緩衝寫滿以後再溢寫到磁盤文件的。最後,一樣會將全部臨時磁盤文件都合併成一個磁盤文件,並建立一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是如出一轍的,由於都要建立數量驚人的磁盤文件,只是在最後會作一個磁盤文件的合併而已。所以少許的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來講,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運行機制的不一樣在於:第一,磁盤寫機制不一樣;第二,不會進行排序。也就是說,啓用該機制的最大好處在於,shuffle write過程當中,不須要進行數據的排序操做,也就節省掉了這部分的性能開銷。

普通運行機制的SortShuffleManager工做原理如圖1-10所示:

 

圖6-5 bypass運行機制的SortShuffleManager工做原理

7. Spark 內存管理

在執行Spark 的應用程序時,Spark 集羣會啓動 Driver 和 Executor 兩種 JVM 進程,前者爲主控進程,負責建立 Spark 上下文,提交 Spark 做業(Job),並將做業轉化爲計算任務(Task),在各個 Executor 進程間協調任務的調度,後者負責在工做節點上執行具體的計算任務,並將結果返回給 Driver,同時爲須要持久化的 RDD 提供存儲功能。因爲 Driver 的內存管理相對來講較爲簡單,本節主要對 Executor 的內存管理進行分析,下文中的 Spark 內存均特指 Executor 的內存。

7.1 堆內和堆外內存規劃

做爲一個 JVM 進程,Executor 的內存管理創建在 JVM 的內存管理之上,Spark 對 JVM 的堆內(On-heap)空間進行了更爲詳細的分配,以充分利用內存。同時,Spark 引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,進一步優化了內存的使用。

堆內內存受到JVM統一管理,堆外內存是直接向操做系統進行內存的申請和釋放。

 

圖7-1 Executor堆內與堆外內存

  1. 堆內內存

堆內內存的大小,由 Spark 應用程序啓動時的 –executor-memory 或 spark.executor.memory 參數配置。Executor 內運行的併發任務共享 JVM 堆內內存,這些任務在緩存 RDD 數據和廣播(Broadcast)數據時佔用的內存被規劃爲存儲(Storage)內存,而這些任務在執行 Shuffle 時佔用的內存被規劃爲執行(Execution)內存,剩餘的部分不作特殊規劃,那些 Spark 內部的對象實例,或者用戶定義的 Spark 應用程序中的對象實例,均佔用剩餘的空間。不一樣的管理模式下,這三部分佔用的空間大小各不相同。

Spark 對堆內內存的管理是一種邏輯上的」規劃式」的管理,由於對象實例佔用內存的申請和釋放都由 JVM 完成,Spark 只能在申請後和釋放前記錄這些內存,咱們來看其具體流程:

申請內存流程以下:

  1. Spark 在代碼中 new 一個對象實例;
  2. JVM 從堆內內存分配空間,建立對象並返回對象引用;
  3. Spark 保存該對象的引用,記錄該對象佔用的內存。

釋放內存流程以下:

1. Spark記錄該對象釋放的內存,刪除該對象的引用;

2. 等待JVM的垃圾回收機制釋放該對象佔用的堆內內存。

咱們知道,JVM 的對象能夠以序列化的方式存儲,序列化的過程是將對象轉換爲二進制字節流,本質上能夠理解爲將非連續空間的鏈式存儲轉化爲連續空間或塊存儲,在訪問時則須要進行序列化的逆過程——反序列化,將字節流轉化爲對象,序列化的方式能夠節省存儲空間,但增長了存儲和讀取時候的計算開銷。

對於 Spark 中序列化的對象,因爲是字節流的形式,其佔用的內存大小可直接計算,而對於非序列化的對象,其佔用的內存是經過週期性地採樣近似估算而得,即並非每次新增的數據項都會計算一次佔用的內存大小,這種方法下降了時間開銷可是有可能偏差較大,致使某一時刻的實際內存有可能遠遠超出預期。此外,在被 Spark 標記爲釋放的對象實例,頗有可能在實際上並無被 JVM 回收,致使實際可用的內存小於 Spark 記錄的可用內存。因此 Spark 並不能準確記錄實際可用的堆內內存,從而也就沒法徹底避免內存溢出(OOM, Out of Memory)的異常。

雖然不能精準控制堆內內存的申請和釋放,但 Spark 經過對存儲內存和執行內存各自獨立的規劃管理,能夠決定是否要在存儲內存裏緩存新的 RDD,以及是否爲新的任務分配執行內存,在必定程度上能夠提高內存的利用率,減小異常的出現。

  1. 堆外內存

爲了進一步優化內存的使用以及提升 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,存儲通過序列化的二進制數據。

堆外內存意味着把內存對象分配在Java虛擬機的堆之外的內存,這些內存直接受操做系統管理(而不是虛擬機)。這樣作的結果就是能保持一個較小的堆,以減小垃圾收集對應用的影響。

利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時再也不基於 Tachyon,而是與堆外的執行內存同樣,基於 JDK Unsafe API 實現),Spark 能夠直接操做系統堆外內存,減小了沒必要要的內存開銷,以及頻繁的 GC 掃描和回收,提高了處理性能。堆外內存能夠被精確地申請和釋放(堆外內存之因此可以被精確的申請和釋放,是因爲內存的申請和釋放再也不經過JVM機制,而是直接向操做系統申請,JVM對於內存的清理是沒法準確指定時間點的,所以沒法實現精確的釋放),並且序列化的數據佔用的空間能夠被精確計算,因此相比堆內內存來講下降了管理的難度,也下降了偏差。

在默認狀況下堆外內存並不啓用,可經過配置 spark.memory.offHeap.enabled 參數啓用,並由 spark.memory.offHeap.size 參數設定堆外空間的大小。除了沒有 other 空間,堆外內存與堆內內存的劃分方式相同,全部運行中的併發任務共享存儲內存和執行內存。

*該部份內存主要用於程序的共享庫、Perm Space、線程Stack和一些Memory mapping等, 或者類C方式allocate object)

7.2 內存空間分配

1. 靜態內存管理

Spark 最初採用的靜態內存管理機制下,存儲內存、執行內存和其餘內存的大小在 Spark 應用程序運行期間均爲固定的,但用戶能夠應用程序啓動前進行配置,堆內內存的分配如圖 2 所示:

 

圖7-2 靜態內存管理——堆內內存

能夠看到,可用的堆內內存的大小須要按照代碼清單1-1的方式計算:

代碼清單1-1 堆內內存計算公式

可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction

可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction

其中 systemMaxMemory 取決於當前 JVM 堆內內存的大小,最後可用的執行內存或者存儲內存要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。上述計算公式中的兩個 safetyFraction 參數,其意義在於在邏輯上預留出 1-safetyFraction 這麼一塊保險區域,下降因實際內存超出當前預設範圍而致使 OOM 的風險(上文提到,對於非序列化對象的內存採樣估算會產生偏差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時 Spark 並無區別對待,和」其它內存」同樣交給了 JVM 去管理。

Storage內存和Execution內存都有預留空間,目的是防止OOM,由於Spark堆內內存大小的記錄是不許確的,須要留出保險區域。

堆外的空間分配較爲簡單,只有存儲內存和執行內存,如圖1-3所示。可用的執行內存和存儲內存佔用的空間大小直接由參數spark.memory.storageFraction 決定,因爲堆外內存佔用的空間能夠被精確計算,因此無需再設定保險區域。

 

圖7-3 靜態內存管理

靜態內存管理機制實現起來較爲簡單,但若是用戶不熟悉 Spark 的存儲機制,或沒有根據具體的數據規模和計算任務或作相應的配置,很容易形成」一半海水,一半火焰」的局面,即存儲內存和執行內存中的一方剩餘大量的空間,而另外一方卻早早被佔滿,不得不淘汰或移出舊的內容以存儲新的內容。因爲新的內存管理機制的出現,這種方式目前已經不多有開發者使用,出於兼容舊版本的應用程序的目的,Spark 仍然保留了它的實現。

  1. 統一內存管理

Spark 1.6 以後引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,能夠動態佔用對方的空閒區域,統一內存管理的堆內內存結構如圖 1-4所示:

 

圖7-4 統一內存管理——堆內內存

統一內存管理的堆外內存結構如圖 1-5所示:

 

圖7-5 統一內存管理——堆外內存

其中最重要的優化在於動態佔用機制,其規則以下:

  1. 設定基本的存儲內存和執行內存區域(spark.storage.storageFraction 參數),該設定肯定了雙方各自擁有的空間的範圍;
  2. 雙方的空間都不足時,則存儲到硬盤;若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)

3. 執行內存的空間被對方佔用後,可以讓對方將佔用的部分轉存到硬盤,而後」歸還」借用的空間;

4. 存儲內存的空間被對方佔用後,沒法讓對方」歸還」,由於須要考慮 Shuffle 過程當中的不少因素,實現起來較爲複雜。

統一內存管理的動態佔用機制如圖 1-6所示:

 

圖7-6 同一內存管理——動態佔用機制

憑藉統一內存管理機制,Spark 在必定程度上提升了堆內和堆外內存資源的利用率,下降了開發者維護 Spark 內存的難度,但並不意味着開發者能夠高枕無憂。若是存儲內存的空間太大或者說緩存的數據過多,反而會致使頻繁的全量垃圾回收,下降任務執行時的性能,由於緩存的 RDD 數據一般都是長期駐留內存的。因此要想充分發揮 Spark 的性能,須要開發者進一步瞭解存儲內存和執行內存各自的管理方式和實現原理。

7.3 存儲內存管理

  1. RDD的持久化機制

彈性分佈式數據集(RDD)做爲 Spark 最根本的數據抽象,是隻讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上建立,或者在其餘已有的 RDD 上執行轉換(Transformation)操做產生一個新的 RDD。轉換後的 RDD 與原始的 RDD 之間產生的依賴關係,構成了血統(Lineage)。憑藉血統,Spark 保證了每個 RDD 均可以被從新恢復。但 RDD 的全部轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 纔會建立任務讀取 RDD,而後真正觸發轉換的執行。

Task 在啓動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,若是沒有則須要檢查 Checkpoint 或按照血統從新計算。因此若是一個 RDD 上要執行屢次行動,能夠在第一次行動中使用 persist 或 cache 方法,在內存或磁盤中持久化或緩存這個 RDD,從而在後面的行動時提高計算速度。

事實上,cache 方法是使用默認的 MEMORY_ONLY 的存儲級別將 RDD 持久化到內存,故緩存是一種特殊的持久化。 堆內和堆外存儲內存的設計,即可以對緩存 RDD 時使用的內存作統一的規劃和管理

RDD 的持久化由 Spark 的 Storage 模塊負責,實現了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程當中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構,即 Driver 端的 BlockManager 爲 Master,Executor 端的 BlockManager 爲 Slave。

Storage 模塊在邏輯上以 Block 爲基本存儲單位,RDD 的每一個 Partition 通過處理後惟一對應一個 BlockBlockId 的格式爲 rdd_RDD-ID_PARTITION-ID )。Driver端的Master 負責整個 Spark 應用程序的 Block 的元數據信息的管理和維護,而Executor端的 Slave 須要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。

 

圖7-7 Storage模塊示意圖

在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不一樣的存儲級別 ,而存儲級別是如下 5 個變量的組合:

代碼清單5-1 resourceOffer代碼

class StorageLevel private(

private var _useDisk: Boolean, //磁盤

private var _useMemory: Boolean, //這裏實際上是指堆內內存

private var _useOffHeap: Boolean, //堆外內存

private var _deserialized: Boolean, //是否爲非序列化

private var _replication: Int = 1 //副本個數

)

Spark中7種存儲級別以下:

表5-1 Spark持久化級別

持久化級別

含義

MEMORY_ONLY

以非序列化的Java對象的方式持久化在JVM內存中。若是內存沒法徹底存儲RDD全部的partition,那麼那些沒有持久化的partition就會在下一次須要使用它們的時候,從新被計算

MEMORY_AND_DISK

同上,可是當某些partition沒法存儲在內存中時,會持久化到磁盤中。下次須要使用這些partition時,須要從磁盤上讀取

MEMORY_ONLY_SER

同MEMORY_ONLY,可是會使用Java序列化方式,將Java對象序列化後進行持久化。能夠減小內存開銷,可是須要進行反序列化,所以會加大CPU開銷

MEMORY_AND_DISK_SER

同MEMORY_AND_DISK,可是使用序列化方式持久化Java對象

DISK_ONLY

 使用非序列化Java對象的方式持久化,徹底存儲到磁盤上

MEMORY_ONLY_2

MEMORY_AND_DISK_2

等等

 若是是尾部加了2的持久化級別,表示將持久化數據複用一份,保存到其餘節點,從而在數據丟失時,不須要再次計算,只須要使用備份數據便可

經過對數據結構的分析,能夠看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:

1) 存儲位置:磁盤/堆內內存/堆外內存。如 MEMORY_AND_DISK 是同時在磁盤和堆內內存上存儲,實現了冗餘備份。OFF_HEAP 則是隻在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其餘位置。

2) 存儲形式Block 緩存到存儲內存後,是否爲非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。

3) 副本數量:大於 1 時須要遠程冗餘備份到其餘節點。如 DISK_ONLY_2 須要遠程備份 1 個副本。

  1. RDD的緩存過程

RDD 在緩存到存儲內存以前,Partition 中的數據通常以迭代器(Iterator)的數據結構來訪問,這是 Scala 語言中一種遍歷數據集合的方法。經過 Iterator 能夠獲取分區中每一條序列化或者非序列化的數據項(Record),這些 Record 的對象實例在邏輯上佔用了 JVM 堆內內存的 other 部分的空間,同一 Partition 的不一樣 Record 的存儲空間並不連續。

RDD 在緩存到存儲內存以後,Partition 被轉換成 Block,Record 在堆內或堆外存儲內存中佔用一塊連續的空間。Partition由不連續的存儲空間轉換爲連續存儲空間的過程,Spark稱之爲"展開"(Unroll)

Block 有序列化和非序列化兩種存儲格式,具體以哪一種方式取決於該 RDD 的存儲級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義,用一個數組存儲全部的對象實例,序列化的 Block 則以 SerializedMemoryEntry的數據結構定義,用字節緩衝區(ByteBuffer)來存儲二進制數據。每一個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中全部的 Block 對象的實例,對這個 LinkedHashMap 新增和刪除間接記錄了內存的申請和釋放

由於不能保證存儲空間能夠一次容納 Iterator 中的全部數據,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時佔位,空間不足則 Unroll 失敗,空間足夠時能夠繼續進行。

對於序列化的 Partition,其所需的 Unroll 空間能夠直接累加計算,一次申請。

對於非序列化的 Partition 則要在遍歷 Record 的過程當中依次申請,即每讀取一條 Record,採樣估算其所需的 Unroll 空間並進行申請,空間不足時能夠中斷,釋放已佔用的 Unroll 空間。

若是最終 Unroll 成功,當前 Partition 所佔用的 Unroll 空間被轉換爲正常的緩存 RDD 的存儲空間,以下圖所示。

 

圖7-8 Spark Unroll

在靜態內存管理時,Spark 在存儲內存中專門劃分了一塊 Unroll 空間,其大小是固定的,統一內存管理時則沒有對 Unroll 空間進行特別區分,當存儲空間不足時會根據動態佔用機制進行處理。

  1. 淘汰與落盤

因爲同一個 Executor 的全部的計算任務共享有限的存儲內存空間,當有新的 Block 須要緩存可是剩餘空間不足且沒法動態佔用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 若是其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),不然直接刪除該 Block

存儲內存的淘汰規則爲:

被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬於堆外或堆內內存;

新舊 Block 不能屬於同一個 RDD,避免循環淘汰;

Block 所屬 RDD 不能處於被讀狀態,避免引起一致性問題;

遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到知足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。

落盤的流程則比較簡單,若是其存儲級別符合_useDisk 爲 true 的條件,再根據其_deserialized 判斷是不是非序列化的形式,如果則對其進行序列化,最後將數據存儲到磁盤,在 Storage 模塊中更新其信息。

7.4 執行內存管理

執行內存主要用來存儲任務在執行 Shuffle 時佔用的內存,Shuffle 是按照必定規則對 RDD 數據從新分區的過程,咱們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:

l Shuffle Write

map 端會採用 ExternalSorter 進行外排,在內存中存儲數據時主要佔用堆內執行空間。

l Shuffle Read

1) 在對 reduce 端的數據進行聚合時,要將數據交給 Aggregator 處理,在內存中存儲數據時佔用堆內執行空間。

2) 若是須要進行最終結果排序,則要將再次將數據交給 ExternalSorter 處理,佔用堆內執行空間。

ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程當中全部數據並不能都保存到該哈希表中,當這個哈希表佔用的內存會進行週期性地採樣估算,當其大到必定程度,沒法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其所有內容存儲到磁盤文件中,這個過程被稱爲溢存(Spill),溢存到磁盤的文件最後會被歸併(Merge)。

Spark 的存儲內存和執行內存有着大相徑庭的管理方式:對於存儲內存來講,Spark 用一個 LinkedHashMap 來集中管理全部的 Block,Block 由須要緩存的 RDD 的 Partition 轉化而成;而對於執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程當中的數據,在 Tungsten 排序中甚至抽象成爲頁式內存管理,開闢了全新的 JVM 內存管理機制。

8. Spark 核心組件解析

8.1 BlockManager數據存儲與管理機制

BlockManager是整個Spark底層負責數據存儲與管理的一個組件,Driver和Executor的全部數據都由對應的BlockManager進行管理。

Driver上有BlockManagerMaster,負責對各個節點上的BlockManager內部管理的數據的元數據進行維護,好比block的增刪改等操做,都會在這裏維護好元數據的變動。

每一個節點都有一個BlockManager,每一個BlockManager建立以後,第一件事即便去向BlockManagerMaster進行註冊,此時BlockManagerMaster會爲其長難句對應的BlockManagerInfo。

BlockManager運行原理以下圖所示:

 

圖8-1 BlockManager原理

BlockManagerMaster與BlockManager的關係很是像NameNode與DataNode的關係,BlockManagerMaster中保存中BlockManager內部管理數據的元數據,進行維護,當BlockManager進行Block增刪改等操做時,都會在BlockManagerMaster中進行元數據的變動,這與NameNode維護DataNode的元數據信息,DataNode中數據發生變化時NameNode中的元數據信息也會相應變化是一致的。

每一個節點上都有一個BlockManager,BlockManager中有3個很是重要的組件:

  • DiskStore:負責對磁盤數據進行讀寫;
  • MemoryStore:負責對內存數據進行讀寫;
  • BlockTransferService:負責創建BlockManager到遠程其餘節點的BlockManager的鏈接,負責對遠程其餘節點的BlockManager的數據進行讀寫;

每一個BlockManager建立以後,作的第一件事就是想BlockManagerMaster進行註冊,此時BlockManagerMaster會爲其建立對應的BlockManagerInfo。

使用BlockManager進行寫操做時,好比說,RDD運行過程當中的一些中間數據,或者咱們手動指定了persist(),會優先將數據寫入內存中,若是內存大小不夠,會使用本身的算法,將內存中的部分數據寫入磁盤;此外,若是persist()指定了要replica,那麼會使用BlockTransferService將數據replicate一份到其餘節點的BlockManager上去。

使用BlockManager進行讀操做時,好比說,shuffleRead操做,若是能從本地讀取,就利用DiskStore或者MemoryStore從本地讀取數據,可是本地沒有數據的話,那麼會用BlockTransferService與有數據的BlockManager創建鏈接,而後用BlockTransferService從遠程BlockManager讀取數據;例如,shuffle Read操做中,頗有可能要拉取的數據在本地沒有,那麼此時就會到遠程有數據的節點上,找那個節點的BlockManager來拉取須要的數據。

只要使用BlockManager執行了數據增刪改的操做,那麼必須將Block的BlockStatus上報到BlockManagerMaster,在BlockManagerMaster上會對指定BlockManager的BlockManagerInfo內部的BlockStatus進行增刪改操做,從而達到元數據的維護功能。

8.2 Spark 共享變量底層實現

Spark一個很是重要的特性就是共享變量。

默認狀況下,若是在一個算子的函數中使用到了某個外部的變量,那麼這個變量的值會被拷貝到每一個task中,此時每一個task只能操做本身的那份變量副本。若是多個task想要共享某個變量,那麼這種方式是作不到的。

Spark爲此提供了兩種共享變量,一種是Broadcast Variable(廣播變量),另外一種是Accumulator(累加變量)。Broadcast Variable會將用到的變量,僅僅爲每一個節點拷貝一份,即每一個Executor拷貝一份,更大的用途是優化性能,減小網絡傳輸以及內存損耗。Accumulator則能夠讓多個task共同操做一份變量,主要能夠進行累加操做。Broadcast Variable是共享讀變量,task不能去修改它,而Accumulator可讓多個task操做一個變量。

8.2.1 廣播變量

廣播變量容許編程者在每一個Executor上保留外部數據的只讀變量,而不是給每一個任務發送一個副本

每一個task都會保存一份它所使用的外部變量的副本,當一個Executor上的多個task都使用一個大型外部變量時,對於Executor內存的消耗是很是大的,所以,咱們能夠將大型外部變量封裝爲廣播變量,此時一個Executor保存一個變量副本,此Executor上的全部task共用此變量,再也不是一個task單獨保存一個副本,這在必定程度上下降了Spark任務的內存佔用。

 

圖8-2 task使用外部變量

 

圖8-3 使用廣播變量

Spark還嘗試使用高效的廣播算法分發廣播變量,以下降通訊成本。

Spark提供的Broadcast Variable是隻讀的,而且在每一個Executor上只會有一個副本,而不會爲每一個task都拷貝一份副本,所以,它的最大做用,就是減小變量到各個節點的網絡傳輸消耗,以及在各個節點上的內存消耗。此外,Spark內部也使用了高效的廣播算法來減小網絡消耗。

能夠經過調用SparkContext的broadcast()方法來針對每一個變量建立廣播變量。而後在算子的函數內,使用到廣播變量時,每一個Executor只會拷貝一份副本了,每一個task可使用廣播變量的value()方法獲取值。

在任務運行時,Executor並不獲取廣播變量,當task執行到 使用廣播變量的代碼時,會向Executor的內存中請求廣播變量,以下圖所示:

 

圖8-4 task向Executor請求廣播變量

以後Executor會經過BlockManager向Driver拉取廣播變量,而後提供給task進行使用,以下圖所示:

 

圖8-5 Executor從Driver拉取廣播變量

廣播大變量是Spark中經常使用的基礎優化方法,經過減小內存佔用實現任務執行性能的提高。

8.2.2 累加器

累加器(accumulator):Accumulator是僅僅被相關操做累加的變量,所以能夠在並行中被有效地支持。它們可用於實現計數器(如MapReduce)或總和計數。

Accumulator是存在於Driver端的,集羣上運行的task進行Accumulator的累加,隨後把值發到Driver端,在Driver端彙總Spark UI在SparkContext建立時被建立,即在Driver端被建立,所以它能夠讀取Accumulator的數值),因爲Accumulator存在於Driver端,從節點讀取不到Accumulator的數值。

Spark提供的Accumulator主要用於多個節點對一個變量進行共享性的操做Accumulator只提供了累加的功能,可是卻給咱們提供了多個task對於同一個變量並行操做的功能,可是task只能對Accumulator進行累加操做,不能讀取它的值,只有Driver程序能夠讀取Accumulator的值。

Accumulator的底層原理以下圖所示:

 

圖8-6 累加器原理

9. 總結

Spark的內核原理對於更好的使用Spark完成開發任務有着很是重要的做用,同時Spark的內核知識也是面試過程當中常常被問到的知識點。

在本課程的學習中,咱們對Spark的部署模式、通訊架構、任務調度機制、Shuffle過程、內存管理機制以及Spark的核心組件進行了詳細分析,這些內容都是Spark最爲重要的架構原理,但願在以後的學習中你們能夠不斷深化對於Spark內核架構的理解,在更高的層次上去使用Spark技術框架。

相關文章
相關標籤/搜索