大數據技術之_19_Spark學習_06_Spark 源碼解析 + Spark 通訊架構、腳本解析、standalone 模式啓動、提交流程 + Spark Shuffle 過程 + Spark 內存

第1章 Spark 總體概述1.1 總體概念1.2 RDD 抽象1.3 計算抽象1.4 集羣模式1.5 RPC 網絡通訊抽象1.6 啓動 Standalone 集羣1.7 核心組件1.8 核心組件交互流程1.9 Block 管理1.10總體應用第2章 Spark 通訊架構2.1 通訊組件概覽2.2 Endpoint 啓動過程2.3 Endpoint Send&Ask 流程2.4 Endpoint Receive 流程2.5 Endpoint Inbox 處理流程2.6 Endpoint 畫像第3章 腳本解析3.1 start-daemon.sh3.2 spark-class3.3 start-master.sh3.4 start-slaves.sh3.5 start-all.sh3.6 spark-submit第4章 Master 節點啓動4.1 腳本概覽4.2 啓動流程4.3 OnStart 監聽事件4.4 RpcMessage 處理 (receiveAndReply)4.5 OneWayMessage 處理 (receive)4.6 Master 對 RpcMessage/OneWayMessage 處理邏輯第5章 Worker 節點啓動5.1 腳本概覽5.2 啓動流程5.3 OnStart 監聽事件5.4 RpcMessage 處理 (receiveAndReply)5.5 OneWayMessage 處理 (receive)第6章 Client 啓動流程6.1 腳本概覽6.2 SparkSubmit 啓動流程6.3 Client 啓動流程6.4 Client 的 OnStart 監聽事件6.5 RpcMessage 處理 (receiveAndReply)6.6 OneWayMessage 處理(receive)第7章 Driver 和 DriverRunner7.1 Master 對 Driver 資源分配7.2 Worker 運行 DriverRunner7.3 DriverRunner 建立並運行 DriverWrapper第8章 SparkContext 解析8.1 SparkContext 解析8.2 SparkContext 建立過程8.3 SparkContext 簡易結構與交互關係8.4 Master 對 Application 資源分配8.5 Worker 建立 Executor第9章 Job 提交和 Task 的拆分9.1 總體預覽9.2 Code 轉化爲初始 RDDs9.3 RDD 分解爲待執行任務集合(TaskSet)9.4 TaskSet 封裝爲 TaskSetManager 並提交至 Driver9.5 Driver 將 TaskSetManager 分解爲 TaskDescriptions 併發布任務到 Executor第10章 Task 執行和回饋10.1 Task 的執行流程10.2 Task 的回饋流程10.3 Task 的迭代流程10.4 精彩圖解第11章 Spark 的數據存儲11.1 存儲子系統概覽11.2 啓動過程分析11.3 通訊層11.4 存儲層11.4.1 Disk Store11.4.2 Memory Store11.5 數據寫入過程分析11.5.1 序列化與否11.6 數據讀取過程分析11.6.1 本地讀取11.6.2 遠程讀取11.7 Partition 如何轉化爲 Block11.8 partition 和 block 的對應關係第12章 Spark Shuffle 過程12.1 MapReduce 的 Shuffle 過程介紹12.1.1 Spill 過程(刷寫過程)12.1.2 Merge12.1.3 Copy12.1.4 Merge Sort12.2 HashShuffle 過程介紹12.3 SortShuffle 過程介紹12.4 TungstenShuffle 過程介紹12.5 MapReduce 與 Spark 過程對比第13章 Spark 內存管理13.1 堆內和堆外內存規劃13.1.1 堆內內存13.1.2 堆外內存13.1.3 內存管理接口13.2 內存空間分配13.2.1 靜態內存管理13.2.2 統一內存管理13.3 存儲內存管理13.3.1 RDD 的持久化機制13.3.2 RDD 緩存的過程13.3.3 淘汰和落盤13.4 執行內存管理13.4.1 多任務間內存分配13.4.2 Shuffle 的內存佔用第14章 部署模式解析14.1 部署模式概述14.2 standalone 框架14.2.1 Standalone 模式下任務運行過程14.2.2 總結14.3 yarn 集羣模式14.4 mesos 集羣模式14.5 spark 三種部署模式的區別14.6 異常場景分析14.6.1 異常分析1:Worker 異常退出14.6.2 異常分析2:Executor 異常退出14.6.3 異常分析3:Master 異常退出第15章 wordcount 程序運行原理窺探15.1 spark 之 scala 實現 wordcount15.2 原理窺探vue


第1章 Spark 總體概述

1.1 總體概念

  Apache Spark 是一個開源的通用集羣計算系統,它提供了 High-level 編程 API,支持 Scala、Java 和 Python 三種編程語言。Spark 內核使用 Scala 語言編寫,經過基於 Scala 的函數式編程特性,在不一樣的計算層面進行抽象,代碼設計很是優秀java

1.2 RDD 抽象

  RDD(Resilient Distributed Datasets),彈性分佈式數據集,它是對分佈式數據集的一種內存抽象,經過受限的共享內存方式來提供容錯性,同時這種內存模型使得計算比傳統的數據流模型要高效。RDD 具備 5 個重要的特性,以下圖所示:node


上圖展現了 2 個 RDD 進行 JOIN 操做,體現了 RDD 所具有的 5 個主要特性,以下所示:
  • 1)一組分區
  • 2)計算每個數據分片的函數
  • 3)RDD 上的一組依賴
  • 4)可選,對於鍵值對 RDD,有一個 Partitioner(一般是 HashPartitioner)
  • 5)可選,一組 Preferred location 信息(例如,HDFS 文件的 Block 所在 location 信息)
有了上述特性,可以很是好地經過 RDD 來表達分佈式數據集,並做爲構建 DAG 圖的基礎:首先抽象一個分佈式計算任務的邏輯表示,最終將任務在實際的物理計算環境中進行處理執行。

 

1.3 計算抽象

在描述 Spark 中的計算抽象,咱們首先須要瞭解以下幾個概念:
1)Application
  • 用戶編寫的 Spark 程序,完成一個計算任務的處理。它是由一個 Driver 程序和一組運行於 Spark 集羣上的 Executor 組成。
2)Job
  • 用戶程序中,每次調用 Action 時,邏輯上會生成一個 Job,一個 Job 包含了多個 Stage 。
3)Stage
  • Stage 包括兩類:ShuffleMapStage 和 ResultStage,若是用戶程序中調用了須要進行 Shuffle 計算的 Operator,如 groupByKey 等,就會以 Shuffle 爲邊界分紅 ShuffleMapStage 和 ResultStage。
4)TaskSet
  • 基於 Stage 能夠直接映射爲 TaskSet,一個 TaskSet 封裝了一次須要運算的、具備相同處理邏輯的 Task,這些 Task 能夠並行計算,粗粒度的調度是以 TaskSet 爲單位的。
5)Task
  • Task 是在物理節點上運行的基本單位,Task 包含兩類:ShuffleMapTask 和 ResultTask,分別對應於 Stage 中 ShuffleMapStage 和 ResultStage 中的一個執行基本單元。
下面,咱們看一下,上面這些基本概念之間的關係,以下圖所示:python


  上圖,爲了簡單,每一個 Job 假設都很簡單,而且只須要進行一次 Shuffle 處理,因此都對應 2 個 Stage。實際應用中,一個 Job 可能包含若干個 Stage,或者是一個相對複雜的 Stage DAG。
在 Standalone 模式下,默認使用的是 FIFO 這種簡單的調度策略,在進行調度的過程當中,大概流程以下圖所示:

  從用戶提交 Spark 程序,最終生成 TaskSet,而在調度時,經過 TaskSetManager 來管理一個 TaskSet(包含一組可在物理節點上執行的 Task),這裏面 TaskSet 必需要按照順序執行才能保證計算結果的正確性,由於 TaskSet 之間是有序依賴的(上溯到 ShuffleMapStage 和 ResultStage),只有一個 TaskSet 中的全部 Task 都運行完成後,才能調度下一個 TaskSet 中的 Task 去執行。

 

1.4 集羣模式

  Spark 集羣在設計的時候,並無在資源管理的設計上對外封閉,而是充分考慮了將來對接一些更強大的資源管理系統,如 YARN、Mesos 等,因此 Spark 架構設計將資源管理單獨抽象出一層,經過這種抽象可以構建一種適合企業當前技術棧的插件式資源管理模塊,從而爲不一樣的計算場景提供不一樣的資源分配與調度策略。Spark 集羣模式架構,以下圖所示:web


上圖中,Spark集羣Cluster Manager目前支持以下三種模式:
1)Standalone 模式
  • Standalone 模式是 Spark 內部默認實現的一種集羣管理模式,這種模式是經過集羣中的 Master 來統一管理資源,而與 Master 進行資源請求協商的是 Driver 內部的 StandaloneSchedulerBackend(其實是其內部的 StandaloneAppClient 真正與 Master 通訊),後面會詳細說明。
2)YARN 模式
  • YARN 模式下,能夠將資源的管理統一交給 YARN 集羣的 ResourceManager 去管理,選擇這種模式,能夠更大限度的適應企業內部已有的技術棧,若是企業內部已經在使用 Hadoop 技術構建大數據處理平臺。
3)Mesos 模式
  • 隨着 Apache Mesos 的不斷成熟,一些企業已經在嘗試使用 Mesos 構建數據中心的操做系統(DCOS),Spark 構建在 Mesos 之上,可以支持細粒度、粗粒度的資源調度策略(Mesos 的優點),也能夠更好地適應企業內部已有技術棧。
  • 那麼,Spark 中是怎麼考慮知足這一重要的設計決策的呢?也就是說,如何可以保證 Spark 很是容易的讓第三方資源管理系統輕鬆地接入進來。咱們深刻到類設計的層面看一下,以下類圖所示:

  • 能夠看出,Task 調度直接依賴 SchedulerBackend,SchedulerBackend 與實際資源管理模塊交互實現資源請求。這裏面,CoarseGrainedSchedulerBackend 是 Spark 中與資源調度相關的最重要的抽象,它須要抽象出與 TaskScheduler 通訊的邏輯,同時還要可以與各類不一樣的第三方資源管理系統無縫地交互。實際上,CoarseGrainedSchedulerBackend 內部採用了一種 ResourceOffer 的方式來處理資源請求。

 

1.5 RPC 網絡通訊抽象

  Spark RPC 層是基於優秀的網絡通訊框架 Netty 設計開發的,可是 Spark 提供了一種很好地抽象方式,將底層的通訊細節屏蔽起來,並且也可以基於此來設計知足擴展性,好比,若是有其餘不基於 Netty 的網絡通訊框架的新的RPC接入需求,能夠很好地擴展而不影響上層的設計。RPC 層設計,以下圖類圖所示:算法


  任何兩個 Endpoint 只能經過消息進行通訊,能夠實現一個 RpcEndpoint 和一個 RpcEndpointRef。想要與 RpcEndpoint 通訊,須要獲取到該 RpcEndpoint 對應的 RpcEndpointRef 便可,並且管理 RpcEndpoint 和 RpcEndpointRef 建立及其通訊的邏輯,統一在 RpcEnv 對象中管理。

 

1.6 啓動 Standalone 集羣

  Standalone 模式下,Spark 集羣採用了簡單的 Master-Slave 架構模式,Master 統一管理全部的 Worker,這種模式很常見,咱們簡單地看下 Spark Standalone 集羣啓動的基本流程,以下圖所示:shell


能夠看到,Spark 集羣採用的消息的模式進行通訊,也就是 EDA 架構模式,藉助於 RPC 層的優雅設計,任何兩個 Endpoint 想要通訊,發送消息並攜帶數據便可。上圖的流程描述以下所示:
  • 1)Master 啓動時首先創一個 RpcEnv 對象,負責管理全部通訊邏輯。
  • 2)Master 經過 RpcEnv 對象建立一個 Endpoint,Master 就是一個 Endpoint,Worker 能夠與其進行通訊。
  • 3)Worker 啓動時也是創一個 RpcEnv 對象。
  • 4)Worker 經過 RpcEnv 對象建立一個 Endpoint。
  • 5)Worker 經過 RpcEnv 對,創建到 Master 的鏈接,獲取到一個 RpcEndpointRef 對象,經過該對象能夠與 Master 通訊。
  • 6)Worker 向 Master 註冊,註冊內容包括主機名、端口、CPU Core 數量、內存數量。
  • 7)Master 接收到 Worker 的註冊,將註冊信息維護在內存中的 Table 中,其中還包含了一個到 Worker 的 RpcEndpointRef 對象引用。
  • 8)Master 回覆 Worker 已經接收到註冊,告知 Worker 已經註冊成功。
  • 9)此時若是有用戶提交 Spark 程序,Master 須要協調啓動 Driver;而 Worker 端收到成功註冊響應後,開始週期性向 Master 發送心跳。

 

1.7 核心組件

  集羣處理計算任務的運行時(即用戶提交了 Spark 程序),最核心的頂層組件就是 Driver 和 Executor,它們內部管理不少重要的組件來協同完成計算任務,核心組件棧以下圖所示:apache


  Driver 和 Executor 都是運行時建立的組件,一旦用戶程序運行結束,他們都會釋放資源,等待下一個用戶程序提交到集羣而進行後續調度。上圖,咱們列出了大多數組件,其中 SparkEnv 是一個重量級組件,他們內部包含計算過程當中須要的主要組件,並且,Driver 和 Executor 共同須要的組件在 SparkEnv 中也包含了不少。這裏,咱們不作過多詳述,後面交互流程等處會說明大部分組件負責的功能。

 

1.8 核心組件交互流程

  在 Standalone 模式下,Spark 中各個組件之間交互仍是比較複雜的,可是對於一個通用的分佈式計算系統來講,這些都是很是重要並且比較基礎的交互。首先,爲了理解組件之間的主要交互流程,咱們給出一些基本要點:
  • 一個 Application 會啓動一個 Driver
  • 一個 Driver 負責跟蹤管理該 Application 運行過程當中全部的資源狀態和任務狀態
  • 一個 Driver 會管理一組 Executor
  • 一個 Executor 只執行屬於一個 Driver 的 Task
核心組件之間的主要交互流程,以下圖所示:編程


上圖中,經過不一樣顏色或類型的線條,給出了以下 6 個核心的交互流程,咱們會詳細說明:
橙色:提交用戶 Spark 程序
用戶提交一個 Spark 程序,主要的流程以下所示:
  •1)用戶 spark-submit 腳本提交一個 Spark 程序,會建立一個 ClientEndpoint 對象,該對象負責與 Master 通訊交互
  •2)ClientEndpoint 向 Master 發送一個 RequestSubmitDriver 消息,表示提交用戶程序
  •3)Master 收到 RequestSubmitDriver 消息,向 ClientEndpoint 回覆 SubmitDriverResponse,表示用戶程序已經完成註冊
  •4)ClientEndpoint 向 Master 發送 RequestDriverStatus 消息,請求 Driver 狀態
  •5)若是當前用戶程序對應的 Driver 已經啓動,則 ClientEndpoint 直接退出,完成提交用戶程序
紫色:啓動 Driver 進程
當用戶提交用戶 Spark 程序後,須要啓動 Driver 來處理用戶程序的計算邏輯,完成計算任務,這時 Master 協調須要啓動一個 Driver,具體流程以下所示:
  •1)Maser 內存中維護着用戶提交計算的任務 Application,每次內存結構變動都會觸發調度,向 Worker 發送 LaunchDriver 請求
  •2)Worker 收到 LaunchDriver 消息,會啓動一個 DriverRunner 線程去執行 LaunchDriver 的任務
  •3)DriverRunner 線程在 Worker 上啓動一個新的 JVM 實例,該 JVM 實例內運行一個 Driver 進程,該 Driver 會建立 SparkContext 對象
紅色:註冊 Application
Dirver 啓動之後,它會建立 SparkContext 對象,初始化計算過程當中必需的基本組件,並向 Master 註冊 Application,流程描述以下:
  •1)建立 SparkEnv 對象,建立並管理一些數基本組件
  •2)建立 TaskScheduler,負責 Task 調度
  •3)建立 StandaloneSchedulerBackend,負責與 ClusterManager 進行資源協商
  •4)建立 DriverEndpoint,其它組件能夠與 Driver 進行通訊
  •5)在 StandaloneSchedulerBackend 內部建立一個 StandaloneAppClient,負責處理與 Master 的通訊交互
  •6)StandaloneAppClient 建立一個 ClientEndpoint,實際負責與 Master 通訊
  •7)ClientEndpoint 向 Master 發送 RegisterApplication 消息,註冊 Application
  •8)Master 收到 RegisterApplication 請求後,回覆 ClientEndpoint 一個 RegisteredApplication 消息,表示已經註冊成功
藍色:啓動 Executor 進程
  •1)Master 向 Worker 發送 LaunchExecutor 消息,請求啓動 Executor;同時 Master 會向 Driver 發送 ExecutorAdded 消息,表示 Master 已經新增了一個 Executor(此時還未啓動)
  •2)Worker 收到 LaunchExecutor 消息,會啓動一個 ExecutorRunner 線程去執行 LaunchExecutor 的任務
  •3)Worker 向 Master 發送 ExecutorStageChanged 消息,通知 Executor 狀態已發生變化
  •4)Master 向 Driver 發送 ExecutorUpdated 消息,此時 Executor 已經啓動
粉色:啓動 Task 執行
  •1)StandaloneSchedulerBackend 啓動一個 DriverEndpoint
  •2)DriverEndpoint 啓動後,會週期性地檢查 Driver 維護的 Executor 的狀態,若是有空閒的 Executor 便會調度任務執行
  •3)DriverEndpoint 向 TaskScheduler 發送 Resource Offer 請求
  •4)若是有可用資源啓動 Task,則 DriverEndpoint 向 Executor 發送 LaunchTask 請求
  •5)Executor 進程內部的 CoarseGrainedExecutorBackend 調用內部的 Executor 線程的 launchTask 方法啓動 Task
  •6)Executor 線程內部維護一個線程池,建立一個 TaskRunner 線程並提交到線程池執行
綠色:Task 運行完成
  •1)Executor 進程內部的 Executor 線程通知 CoarseGrainedExecutorBackend,Task 運行完成
  •2)CoarseGrainedExecutorBackend 向 DriverEndpoint 發送 StatusUpdated 消息,通知 Driver 運行的 Task 狀態發生變動
  •3)StandaloneSchedulerBackend 調用 TaskScheduler 的 updateStatus 方法更新 Task 狀態
  •4)StandaloneSchedulerBackend 繼續調用 TaskScheduler 的 resourceOffers 方法,調度其餘任務運行

 

1.9 Block 管理

  Block 管理,主要是爲 Spark 提供的 Broadcast 機制提供服務支撐的。Spark 中內置採用 TorrentBroadcast 實現,該 Broadcast 變量對應的數據(Task 數據)或數據集(如 RDD),默認會被切分紅若干 4M 大小的 Block,Task 運行過程當中讀取到該 Broadcast 變量,會以 4M 爲單位的 Block 爲拉取數據的最小單位,最後將全部的 Block 合併成 Broadcast 變量對應的完整數據或數據集。將數據切分紅 4M 大小的 Block,Task 從多個 Executor 拉取 Block,能夠很是好地均衡網絡傳輸負載,提升整個計算集羣的穩定性。
  一般,用戶程序在編寫過程當中,會對某個變量進行 Broadcast,該變量稱爲 Broadcast 變量。在實際物理節點的 Executor 上執行 Task 時,須要讀取 Broadcast 變量對應的數據集,那麼此時會根據須要拉取 DAG 執行流上游已經生成的數據集。採用 Broadcast 機制,能夠有效地下降數據在計算集羣環境中傳輸的開銷。具體地,若是一個用戶對應的程序中的 Broadcast 變量,對應着一個數據集,它在計算過程當中須要拉取對應的數據,若是在同一個物理節點上運行着多個 Task,多個 Task 都須要該數據,有了 Broadcast 機制,只須要拉取一份存儲在本地物理機磁盤便可,供多個 Task 計算共享。
  另外,用戶程序在進行調度過程當中,會根據調度策略將 Task 計算邏輯數據(代碼)移動到對應的 Worker 節點上,最優狀況是對本地數據進行處理,那麼代碼(序列化格式)也須要在網絡上傳輸,也是經過 Broadcast 機制進行傳輸,不過這種方式是首先將代碼序列化到 Driver 所在 Worker 節點,後續若是 Task 在其餘 Worker 中執行,須要讀取對應代碼的 Broadcast 變量,首先就是從 Driver 上拉取代碼數據,接着其餘晚一些被調度的 Task 可能直接從其餘 Worker 上的 Executor 中拉取代碼數據。
  咱們經過以 Broadcast 變量 taskBinary 爲例,說明 Block 是如何管理的,以下圖所示:設計模式


  上圖中,Driver 負責管理全部的 Broadcast 變量對應的數據所在的 Executor,即一個 Executor 維護一個 Block 列表。在 Executor 中運行一個 Task 時,執行到對應的 Broadcast 變量 taskBinary,若是本地沒有對應的數據,則會向 Driver 請求獲取 Broadcast 變量對應的數據,包括一個或多個 Block 所在的 Executor 列表,而後該 Executor 根據 Driver 返回的 Executor 列表,直接經過底層的 BlockTransferService 組件向對應 Executor 請求拉取 Block。Executor 拉取到的 Block 會緩存到本地,同時向 Driver 報告該 Executor 上存在的 Block 信息,以供其餘 Executor 執行 Task 時獲取 Broadcast 變量對應的數據。

 

1.10總體應用

  用戶經過 spark-submit 提交或者運行 spark-shell REPL,集羣建立 Driver,Driver 加載 Application,最後 Application 根據用戶代碼轉化爲 RDD,RDD 分解爲 Tasks,Executor 執行 Task 等系列知識,總體交互藍圖以下:

第2章 Spark 通訊架構

  Spark做爲分佈式計算框架,多個節點的設計與相互通訊模式是其重要的組成部分。Spark 一開始使用 Akka 做爲內部通訊部件。在 Spark 1.3 年代,爲了解決大塊數據(如 Shuffle)的傳輸問題,Spark 引入了 Netty 通訊框架。到了 Spark 1.6,Spark 能夠配置使用 Akka 或者 Netty 了,這意味着 Netty 能夠徹底替代 Akka了。再到 Spark 2,Spark 已經徹底拋棄 Akka了,所有使用 Netty 了。
  爲何呢?官方的解釋是:
  •1)不少 Spark 用戶也使用 Akka,可是因爲 Akka 不一樣版本之間沒法互相通訊,這就要求用戶必須使用跟 Spark 徹底同樣的 Akka 版本,致使用戶沒法升級 Akka。
  •2)Spark 的 Akka 配置是針對 Spark 自身來調優的,可能跟用戶本身代碼中的 Akka 配置衝突。
  •3)Spark 用的 Akka 特性不多,這部分特性很容易本身實現。同時,這部分代碼量相比 Akka 來講少不少,debug 比較容易。若是遇到什麼 bug,也能夠本身立刻 fix,不須要等 Akka 上游發佈新版本。並且,Spark 升級 Akka 自己又由於第一點會強制要求用戶升級他們使用的 Akka,對於某些用戶來講是不現實的。
SPARK 的通訊架構 - Actor 比較,以下圖所示:

2.1 通訊組件概覽

對源碼分析,對於設計思路理解以下:


  •1)RpcEndpoint:RPC 端點,Spark 針對於每一個節點(Client/Master/Worker)都稱之一個 Rpc 端點且都實現 RpcEndpoint 接口,內部根據不一樣端點的需求,設計不一樣的消息和不一樣的業務處理,若是須要發送(詢問)則調用 Dispatcher。
  •2)RpcEnv:RPC 上下文環境,每一個 Rpc 端點運行時依賴的上下文環境稱之爲 RpcEnv。
  •3)Dispatcher:消息分發器,針對於 RPC 端點須要發送消息或者從遠程 RPC 接收到的消息,分發至對應的指令收件箱/發件箱。若是指令接收方是本身存入收件箱,若是指令接收方爲非自身端點,則放入發件箱。
  •4)Inbox:指令消息收件箱,一個本地端點對應一個收件箱,Dispatcher 在每次向 Inbox 存入消息時,都將對應 EndpointData 加入內部待 Receiver Queue中,另外 Dispatcher 建立時會啓動一個單獨線程進行輪詢 Receiver Queue,進行收件箱消息消費。
  •5)OutBox:指令消息發件箱,一個遠程端點對應一個發件箱,當消息放入 Outbox 後,緊接着將消息經過 TransportClient 發送出去。消息放入發件箱以及發送過程是在同一個線程中進行,這樣作的主要緣由是遠程消息分爲 RpcOutboxMessage,OneWayOutboxMessage 兩種消息,而針對於須要應答的消息直接發送且須要獲得結果進行處理
  •6)TransportClient:Netty 通訊客戶端,根據 OutBox 消息的 receiver 信息,請求對應遠程 TransportServer。
  •7)TransportServer:Netty 通訊服務端,一個 RPC 端點一個 TransportServer,接受遠程消息後調用 Dispatcher 分發消息至對應收發件箱。
注意
  TransportClient 與 TransportServer 通訊虛線表示兩個 RpcEnv 之間的通訊,圖示沒有單獨表達式。
  一個 Outbox 一個 TransportClient,圖示沒有單獨表達式。
  一個 RpcEnv 中存在兩個 RpcEndpoint,一個表明自己啓動的 RPC 端點,另一個爲 RpcEndpointVerifier。
Spark的通訊架構 – 高層視圖

Spark 的通訊架構 – 類圖

2.2 Endpoint 啓動過程

啓動的流程以下:


Endpoint 啓動後,默認會向 Inbox 中添加 OnStart 消息,不一樣的端點(Master/Worker/Client)消費 OnStart 指令時,進行相關端點的啓動額外處理。
Endpoint 啓動時,會默認啓動 TransportServer,且啓動結束後會進行一次同步測試 rpc 可用性(askSync-BoundPortsRequest)。
Dispatcher 做爲一個分發器,內部存放了 Inbox,Outbox 的等相關句柄和存放了相關處理狀態數據,結構大體以下:

2.3 Endpoint Send&Ask 流程

Endpoint 的消息發送與請求流程,以下:


Endpoint 根據業務須要存入兩個維度的消息組合:send/ask 某個消息,receiver 是自身與非自身
  •1)OneWayMessage:send + 自身,直接存入收件箱
  •2)OneWayOutboxMessage:send + 非自身,存入發件箱並直接發送
  •3)RpcMessage:ask + 自身,直接存入收件箱,另外還須要存入 LocalNettyRpcCallContext,須要回調後再返回
  •4)RpcOutboxMessage:ask + 非自身,存入發件箱並直接發送,須要回調後再返回

 

2.4 Endpoint Receive 流程

Endpoint 的消息的接收,流程以下:


上圖 ServerBootstrap 爲 Netty 啓動服務,SocketChanel爲Netty 數據通道。
上述包含 TransportSever 啓動與消息接收兩個流程。

 

2.5 Endpoint Inbox 處理流程

Spark 在 Endpoint 的設計上核心設計即爲 Inbox 與 Outbox,其中 Inbox 核心要點爲:
  •1)內部的處理流程拆分爲多個消息指令(InboxMessage)存放入 Inbox。
  •2)當 Dispatcher 啓動最後,會啓動一個名爲【dispatcher-event-loop】的線程掃描 Inbox 待處理 InboxMessage,並調用 Endpoint 根據 InboxMessage 類型作相應處理
  •3)當 Dispatcher 啓動最後,默認會向 Inbox 存入 OnStart 類型的 InboxMessage,Endpoint 在根據 OnStart 指令作相關的額外啓動工做,三端啓動後全部的工做都是對 OnStart 指令處理衍生出來的,所以能夠說 OnStart 指令是相互通訊的源頭。


消息指令類型大體以下三類:
  •1)OnStart/OnStop
  •2)RpcMessage/OneWayMessage
  •3)RemoteProcessDisconnected/RemoteProcessConnected/RemoteProcessConnectionError

2.6 Endpoint 畫像

第3章 腳本解析

在看源碼以前,咱們通常會看相關腳本瞭解其初始化信息以及 Bootstrap 類,Spark 也不例外,而 Spark 中相關的腳本以下:

%SPARK_HOME%/sbin/start-master.sh
%SPARK_HOME%/sbin/start-slaves.sh
%SPARK_HOME%/sbin/start-all.sh
%SPARK_HOME%/bin/spark-submit

啓動腳本中對於公共處理部分進行抽取爲獨立的腳本,以下:

腳本 說明
sbin/spark-config.sh 初始化環境變量 SPARK_CONF_DIR, PYTHONPATH
bin/load-spark-env.sh 初始化環境變量 SPARK_SCALA_VERSION,調用 %SPARK_HOME%
conf/spark-env.sh 加載用戶自定義環境變量

3.1 start-daemon.sh

主要完成進程相關基本信息初始化,而後調用 bin/spark-class 進行守護進程啓動,該腳本是建立端點的通用腳本,三端各自腳本都會調用 spark-daemon.sh 腳本啓動各自進程

詳解以下:

1)初始化 SPRK_HOME、SPARK_CONF_DIR、SPARK_IDENT_STRING、SPARK_LOG_DIR 環境變量 (若是不存在)
2)初始化日誌並測試日誌文件夾讀寫權限,初始化 PID 目錄並校驗 PID 信息
3)調用 /bin/spark-class 腳本,/bin/spark-class 見下面

3.2 spark-class

Master 調用舉例:

bin/spark-class \
--class org.apache.spark.deploy.master.Master \
--host $SPARK_MASTER_HOST \
--port $SPARK_MASTER_PORT \
--webui-port $SPARK_MASTER_WEBUI_PORT $ORIGINAL_ARGS

1)初始化 RUNNER(java)、SPARK_JARS_DIR (%SPARK_HOME%/jars)、LAUNCH_CLASSPATH 信息
2)調用 ("$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATHorg.apache.spark.launcher.Main "$@") 獲取最終執行的 shell 語句
3)執行最終的 shell 語句,示例以下:

/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize
=256m \
org.apache.spark.deploy.master.Master \
--host hadoop102 \
--port 7077 \
--webui-port 8080

若是是 Client,那麼可能爲 r,或者 python 腳本。

3.3 start-master.sh

啓動 Master 的腳本,流程以下:

詳解以下:

1)用戶執行 start-master.sh 腳本,初始化環境變量 SPARK_HOME (若是 PATH 不存在 SPARK_HOME,初始化腳本的上級目錄爲 SPARK_HOME),調用 spark-config.sh,調用 load-spark-env.sh
2)若是環境變量 SPARK_MASTER_HOST、SPARK_MASTER_PORT、SPARK_MASTER_WEBUI_PORT 不存在,進行初始化 7077,hostname -f,8080
3)調用 spark-daemon.sh 腳本啓動 master 進程,以下:

spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
--host $SPARK_MASTER_HOST \
--port $SPARK_MASTER_PORT \
--webui-port $SPARK_MASTER_WEBUI_PORT $ORIGINAL_ARGS)

3.4 start-slaves.sh

啓動 Worker 的腳本,流程以下:

詳解以下:

1)用戶執行 start-slaves.sh 腳本,初始化環境變量 SPARK_HOME,調用 spark-config.sh,調用 load-spark-env.sh,初始化 Master host/port 信息
2)調用 slaves.sh 腳本,讀取 conf/slaves 文件並遍歷,經過 ssh 鏈接到對應 slave 節點,啓動 ${SPARK_HOME}/sbin/start-slave.sh spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT
3)start-slave.sh 在各個節點中,初始化環境變量 SPARK_HOME,調用 spark-config.sh,調用 load-spark-env.sh,根據 $SPARK_WORKER_INSTANCES 計算 WEBUI_PORT 端口 (worker 端口號依次遞增) 並啓動 Worker 進程,以下:

${SPARK_HOME}/sbin/spark-daemon.sh \
start org.apache.spark.deploy.worker.Worker $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"

3.5 start-all.sh

屬於快捷腳本,內部調用 start-master.sh 與 start-slaves.sh 腳本,並沒有額外工做。

3.6 spark-submit

任務提交的基本腳本,流程以下:

詳解以下:

1)直接調用 spark-class 腳本進行進程建立,示例以下:

./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
../examples/jars/spark-examples_2.11-2.1.0.jar 10

2)若是是 java/scala 任務,那麼最終調用 SparkSubmit.scala 進行任務處理,示例以下:

/opt/module/jdk1.8.0_144 -cp \
/opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g -XX:MaxPermSize
=256m \
org.apache.spark.deploy.SparkSubmit \
--master spark://hadoop102:7077 \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.1.0.jar 10

第4章 Master 節點啓動

Master 做爲 Endpoint 的具體實例,下面咱們介紹一下 Master 啓動以及 OnStart 指令後的相關工做。

4.1 腳本概覽

下面是一個舉例:

/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize=256m \
org.apache.spark.deploy.master.Master \
--host hadoop102 \
--port 7077 \

4.2 啓動流程

Master 的啓動流程以下:

詳解以下:

1)SparkConf:加載 key 以 spark. 開頭的系統屬性 (Utils.getSystemProperties)。
2)MasterArguments:
    a) 解析 Master 啓動的參數:
        --ip -i --host -h --port -p --webui-port --properties-file
    b)將 --properties-file (沒有配置默認爲 conf/spark-defaults.conf) 中以 spark. 開頭的配置存入 SparkConf。
3)NettyRpcEnv 中的內部處理遵循 RpcEndpoint 統一處理,這裏再也不贅述。
4)BoundPortsResponse 返回 rpcEndpointPort、webUIPort、restPort 真實端口。
5)最終守護進程會一直存在等待結束信 awaitTermination。

4.3 OnStart 監聽事件

Master 的啓動完成後異步執行工做以下:

詳解以下:

1)【dispatcher-event-loop】線程掃描到 OnStart 指令後會啓動相關 MasterWebUI (默認端口 8080),根據配置選擇安裝 ResetServer (默認端口 6066)。
2)另外新起【master-forward-message-thread】線程按期檢查 Worker 心跳是否超時。
3)若是 Worker 心跳檢測超時,那麼對 Worker 下的發佈的全部任務所屬 Driver 進行 ExecutorUpdated 發送,同時本身再從新 LaunchDriver。

4.4 RpcMessage 處理 (receiveAndReply)

4.5 OneWayMessage 處理 (receive)

4.6 Master 對 RpcMessage/OneWayMessage 處理邏輯

這部分對總體 Master 理解做用不是很大且理解比較抽象,能夠先讀後續內容,回頭再考慮看這部份內容,或者不讀。

第5章 Worker 節點啓動

Worker 做爲 Endpoint 的具體實例,下面咱們介紹一下 Worker 啓動以及 OnStart 指令後的額外工做。

5.1 腳本概覽

下面是一個舉例:

/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g \
-XX:MaxPermSize=256m \
org.apache.spark.deploy.worker.Worker \
--webui-port 8081
spark://hadoop102:7077

5.2 啓動流程

Worker 的啓動流程以下:

詳解以下:

1)SparkConf:加載 key 以 spark. 開頭的系統屬性 (Utils.getSystemProperties)。
2)WorkerArguments:
    a) 解析 Master 啓動的參數:
        --ip -i --host -h --port -p --cores -c --memory -m --work-dir --webui-port --properties-file
    b) 將 --properties-file (沒有配置默認爲conf/spark-defaults.conf) 中以 spark. 開頭的配置存入 SparkConf。
    c) 在沒有配置狀況下,cores 默認爲服務器 CPU 核數。
    d) 在沒有配置狀況下,memory 默認爲服務器內存減 1G,若是低於 1G 取 1G。
    e) webUiPort 默認爲 8081
3)NettyRpcEnv 中的內部處理遵循 RpcEndpoint 統一處理,這裏再也不贅述。
4)最終守護進程會一直存在等待結束信 awaitTermination。

5.3 OnStart 監聽事件

Worker 的啓動完成後異步執行工做以下:

詳解以下:

1)【dispatcher-event-loop】線程掃描到 OnStart 指令後會啓動相關 WorkerWebUI (默認端口 8081)。
2)Worker 向 Master 發起一次 RegisterWorker 指令。
3)另起【master-forward-message-thread】線程按期執行 ReregisterWithMaster 任務,若是註冊成功 (RegisteredWorker) 則跳過,不然再次向 Master 發起 RegisterWorker 指令,直到超過最大次數報錯 (默認16次)。
4)Master 若是能夠註冊,則維護對應的 WorkerInfo 對象並持久化,完成後向 Worker 發起一條 RegisteredWorker 指令,若是 Master 爲 standby 狀態,則向 Worker 發起一條 MasterInStandby 指令。
5)Worker 接受 RegisteredWorker 後,提交【master-forward-message-thread】線程按期執行 SendHeartbeat 任務,完成後向 Worker 發起一條 WorkerLatestState 指令。
6)Worker 發心跳檢測,會觸發更新 Master 對應 WorkerInfo 對象,若是 Master 檢測到異常,則發起 ReconnectWorker 指令至 Worker,Worker 則再次執行 ReregisterWithMaster 工做。

5.4 RpcMessage 處理 (receiveAndReply)

5.5 OneWayMessage 處理 (receive)

第6章 Client 啓動流程

Client 做爲 Endpoint 的具體實例,下面咱們介紹一下 Client 啓動以及 OnStart 指令後的額外工做。

6.1 腳本概覽

下面是一個舉例:

/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ \
-Xmx1g
-XX:MaxPermSize=256m
org.apache.spark.deploy.SparkSubmit
--master spark://hadoop102:7077
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.1.0.jar 10

6.2 SparkSubmit 啓動流程

SparkSubmit 的啓動流程以下:

詳解以下:

1)SparkSubmitArguments:
    a) 解析 Client 啓動的參數
        --name --master --class --deploy-mode
        --num-executors --executor-cores --total-executor-cores --executor-memory
        --driver-memory --driver-cores --driver-class-path --driver-java-options --driver-library-path
        --properties-file
        --kill --status --supervise --queue
        --files --py-files
        --archives --jars --packages --exclude-packages --repositories
        --conf (解析存入 MapsparkProperties 中)
        --proxy-user --principal --keytab --help --verbose --version --usage-error
    b) 合併 --properties-file (沒有配置默認爲 conf/spark-defaults.conf) 文件配置項 (不在 --conf 中的配置 ) 至 sparkProperties
    c) 刪除 sparkProperties 中不以 spark. 開頭的配置項目
    d) 啓動參數爲空的配置項從 sparkProperties 中合併
    e) 根據 action (SUBMITKILLREQUEST_STATUS) 校驗各自必需參數是否有值

2)Case Submit
    a) 獲取childMainClass
        [--deploy-mode
= clent(默認):用戶任務啓動類 mainClass (--class)
        [--deploy-mode
= cluster &  [--master] = spark:* & useRest     org.apache.spark.deploy.rest.RestSubmissionClient
        [--deploy-mode] = cluster &  [--master] = spark:* & !useRest    org.apache.spark.deploy.Client
        [--deploy-mode] = cluster &  [--master] = yarn                  org.apache.spark.deploy.yarn.Client
        [--deploy-mode] = cluster &  [--master] = mesos:*               org.apache.spark.deploy.rest.RestSubmissionClient
    b) 獲取 childArgs (子運行時對應命令行組裝參數)
        [--deploy-mode] = cluster &  [--master] = spark:* & useRest     包含 primaryResource 與 mainClass
        [--deploy-mode] = cluster &  [--master] = spark:* & !useRest    包含 --supervise --memory --cores  launch childArg, primaryResource, mainClass
        [--deploy-mode] = cluster &  [--master] = yarn                  --class --arg --jar/--primary-py-file/--primary-r-file
        [--deploy-mode
= cluster &  [--master] = mesos:*               primaryResource
    c) 獲取 childClasspath
        [--deploy-mode] = clent     讀取 --jars 配置,與 primaryResource 信息 (../examples/jars/spark-examples_2.11-2.1.0.jar)
    d) 獲取 sysProps
        將 sparkPropertie 中的全部配置封裝成新的 sysProps 對象,另外還增長了一下額外的配置項目
    e) 將 childClasspath 經過當前的類加載器加載中
    f) 將 sysProps 設置到當前 jvm 環境中
    g) 最終反射執行 childMainClass,傳參爲 childArgs

6.3 Client 啓動流程

Client 的啓動流程以下:

詳解以下:

1)SparkConf:加載 key 以 spark. 開頭的系統屬性 (Utils.getSystemProperties)。
2)ClientArguments:
    a) 解析 Client 啓動的參數:
        --cores -c --memory -m --supervise -s --verbose -v
        launch jarUrl master mainClass
        kill master driverId
    b) 將 --properties-file (沒有配置默認爲 conf/spark-defaults.conf) 中以 spark. 開頭的配置存入 SparkConf。
    c) 在沒有配置狀況下,cores 默認爲 1 核。
    d) 在沒有配置狀況下,memory 默認爲 1G。
    e) NettyRpcEnv 中的內部處理遵循 RpcEndpoint 統一處理,這裏再也不贅述。
3)最終守護進程會一直存在等待結束信 awaitTermination。

6.4 Client 的 OnStart 監聽事件

Client 的啓動完成後異步執行工做以下: 

詳解以下:

1)若是是發佈任務(case launch),Client 建立一個 DriverDescription,並向 Master 發起 RequestSubmitDriver 請求。
    a) Command 中的 mainClass 爲: org.apache.spark.deploy.worker.DriverWrapper
    b) Command 中的 arguments 爲: Seq("{{WORKER_URL}}""{{USER_JAR}}", driverArgs.mainClass)
2)Master 接受 RequestSubmitDriver 請求後,將 DriverDescription 封裝爲 一個DriverInfo。
    a) startTime 與 submitDate 都爲當前時間
    b) driverId 格式爲:driver-yyyyMMddHHmmss-nextId,nextId 是全局惟一的
3)Master 持久化 DriverInfo,並加入待調度列表中 (waitingDrivers),觸發公共資源調度邏輯。
4)Master 公共資源調度結束後,返回 SubmitDriverResponse給Client。

6.5 RpcMessage 處理 (receiveAndReply)

無。

6.6 OneWayMessage 處理(receive)

第7章 Driver 和 DriverRunner

Client 向 Master 發起 RequestSubmitDriver 請求,Master 將 DriverInfo 添加待調度列表中 (waitingDrivers),下面針對於 Driver 進一步梳理。

7.1 Master 對 Driver 資源分配

大體流程以下:

詳解以下:

waitingDrivers 與 aliveWorkers 進行資源匹配:
1)在 waitingDrivers 循環內,輪詢全部 aliveWorker。
2)若是 aliveWorker 知足當前 waitingDriver 資源要求,給 Worker 發送 LaunchDriver 指令並將 waitingDriver 移除 waitingDrivers,則進行下一次 waitingDriver 的輪詢工做。
3)若是輪詢完全部 aliveWorker 都不知足 waitingDriver 資源要求,則進行下一次 waitingDriver 的輪詢工做。
4)全部發起的輪詢開始點都上次輪詢結束點的下一個點位開始。

7.2 Worker 運行 DriverRunner

Driver 的啓動,流程以下:

詳解以下:

1)當 Worker 遇到 LaunchDriver 指令時,建立並啓動一個 DriverRunner。
2)DriverRunner 啓動一個線程 DriverRunner for [driverId] 處理 Driver 啓動工做。
3)DriverRunner for [driverId]:
    a) 添加 JVM 鉤子,針對於每一個 diriverId 建立一個臨時目錄。
    b) 將 DriverDesc.jarUrl 經過 Netty 從 Driver 機器遠程拷貝過來。
    c) 根據 DriverDesc.command 模板構建本地執行的 command 命令,並啓動該 command 對應的 Process 進程。
    d) 將 Process 的輸出流輸出到文件 stdout/stderror,若是 Process 啓動失敗,進行 1-5 的秒的反覆啓動工做,直到啓動成功,在釋放 Worker 節點的 DriverRunner 的資源。

7.3 DriverRunner 建立並運行 DriverWrapper

DriverWrapper 的運行,流程以下:

詳解以下:

1)DriverWapper 建立了一個 RpcEndpoint 與 RpcEnv。
2)RpcEndpoint 爲 WorkerWatcher,主要目的爲監控 Worker 節點是否正常,若是出現異常就直接退出。
3)而後當前的 ClassLoader 加載 userJar,同時執行 userMainClass。
4)執行用戶的 main 方法後關閉 workerWatcher。

第8章 SparkContext 解析

8.1 SparkContext 解析

SparkContext 是用戶通往 Spark 集羣的惟一入口,任何須要使用 Spark 的地方都須要先建立 SparkContext,那麼 SparkContext 作了什麼?
首先 SparkContext 是在 Driver 程序裏面啓動的,能夠看作 Driver 程序和 Spark 集羣的一個鏈接,SparkContext 在初始化的時候,建立了不少對象,以下圖所示:


上圖列出了 SparkContext 在初始化建立的時候的一些主要組件的構建。

8.2 SparkContext 建立過程

詳解以下:

SparkContext 在新建時:
1)內部建立一個 SparkEnv,SparkEnv 內部建立一個 RpcEnv。
    a) RpcEnv 內部建立並註冊一個 MapOutputTrackerMasterEndpoint(該 Endpoint 暫不介紹)
2)接着建立 DAGScheduler、TaskSchedulerImpl、SchedulerBackend。
    a) TaskSchedulerImpl 建立時建立 SchedulableBuilder,SchedulableBuilder 根據類型分爲 FIFOSchedulableBuilder、FairSchedulableBuilder 兩類
3)最後啓動 TaskSchedulerImpl,TaskSchedulerImpl 啓動 SchedulerBackend。
    a) SchedulerBackend 啓動時建立 ApplicationDescription、DriverEndpoint、StandloneAppClient
    b) StandloneAppClient 內部包括一個 ClientEndpoint

8.3 SparkContext 簡易結構與交互關係

詳解以下:

1)SparkContext:是用戶 Spark 執行任務的上下文,用戶程序內部使用 Spark 提供的 Api 直接或間接建立一個 SparkContext。
2)SparkEnv:用戶執行的環境信息,包括通訊相關的端點。
3)RpcEnv:SparkContext 中遠程通訊環境。
4)ApplicationDescription:應用程序描述信息,主要包含 appName、maxCores、memoryPerExecutorMB、coresPerExecutor、Command (CoarseGrainedExecutorBackend)、appUiUrl 等。
5)ClientEndpoint:客戶端端點,啓動後向 Master 發起註冊 RegisterApplication 請求。
6)Master:接受 RegisterApplication 請求後,進行 Worker 資源分配,並向分配的資源發起 LaunchExecutor 指令。
7)Worker:接受 LaunchExecutor 指令後,運行 ExecutorRunner。
8)ExecutorRunner:運行 applicationDescription 的 Command 命令,最終 Executor,同時向 DriverEndpoint 註冊 Executor 信息。

8.4 Master 對 Application 資源分配

當 Master 接受 Driver 的 RegisterApplication 請求後,放入 waitingDrivers 隊列中,在同一調度中進行資源分配,分配過程以下:

詳解以下:

waitingApps 與 aliveWorkers 進行資源匹配:
1)若是 waitingApp 配置了 app.desc.coresPerExecutor:
    a) 輪詢全部有效可分配的 worker,每次分配一個 executor,executor 的核數爲 minCoresPerExecutor(app.desc.coresPerExecutor),直到不存在有效可分配資源或者 app 依賴的資源已所有被分配。
2)若是 waitingApp 沒有配置 app.desc.coresPerExecutor:
    a) 輪詢全部有效可分配的 worker,每一個 worker 分配一個 executor,executor 的核數爲從 minCoresPerExecutor(爲固定值1) 開始遞增,直到不存在有效可分配資源或者 app 依賴的資源已所有被分配。
3)其中有效可分配 worker 定義爲知足一次資源分配的 worker:
    a) cores 知足:usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
    b) memory 知足(若是是新的 Executor):usableWorkers(pos).memoryFree - assignedExecutors(pos) * memoryPerExecutor >= memoryPerExecutor
注意:Master 針對於 applicationInfo 進行資源分配時,只有存在有效可用的資源就直接分配,而分配剩餘的 app.coresLeft 則等下一次再進行分配。

8.5 Worker 建立 Executor


(圖解:橙色組件是 Endpoint 組件)

詳解以下:

Worker 啓動 Executor
1)在 Worker 的 tempDir 下面建立 application 以及 executor 的目錄,並 chmod 700 操做權限。
2)建立並啓動 ExecutorRunner 進行 Executor 的建立。
3)向 Master 發送 Executor 的狀態狀況。

ExecutorRnner
1)新線程【ExecutorRunner for [executorId]】讀取 ApplicationDescription 將其中 Command 轉化爲本地的 Command 命令。
2)調用 Command 並將日誌輸出至 executor 目錄下的 stdout 和 stderr 日誌文件中,Command 對應的 java 類爲 CoarseGrainedExecutorBackend。

CoarseGrainedExecutorBackend
1)建立一個 SparkEnv,建立 ExecutorEndpoint(CoarseGrainedExecutorBackend)以及 WorkerWatcher。
2)ExecutorEndpoint 建立並啓動後,向 DriverEndpoint 發送 RegisterExecutor 請求並等待返回。
3)DriverEndpoint 處理 RegisterExecutor 請求,返回 ExecutorEndpointRegister 的結果。
4)若是註冊成功,ExecutorEndpoint 內部再建立 Executor 的處理對象。

至此,Spark 運行任務的容器框架就搭建完成。

第9章 Job 提交和 Task 的拆分

在前面的章節 Client 的加載中,Spark 的 DriverRunner 已開始執行用戶任務類(好比:org.apache.spark.examples.SparkPi),下面咱們開始針對於用戶任務類(或者任務代碼)進行分析:

9.1 總體預覽

詳解以下:

1)Code:指的用戶編寫的代碼
2)RDD:彈性分佈式數據集,用戶編碼根據 SparkContext 與 RDD 的 api 可以很好的將 Code 轉化爲 RDD 數據結構(下文將作轉化細節介紹)。
3)DAGScheduler:有向無環圖調度器,將 RDD 封裝爲 JobSubmitted 對象存入 EventLoop (實現類DAGSchedulerEventProcessLoop) 隊列中。
4)EventLoop: 定時掃描未處理 JobSubmitted 對象,將 JobSubmitted 對象提交給 DAGScheduler。
5)DAGScheduler:針對於 JobSubmitted 進行處理,最終將 RDD 轉化爲執行 TaskSet,並將 TaskSet 提交至 TaskScheduler。
6)TaskScheduler: 根據 TaskSet 建立 TaskSetManager 對象存入 SchedulableBuilder 的數據池(Pool)中,並調用 DriverEndpoint 喚起消費(ReviveOffers)操做。
7)DriverEndpoint:接受 ReviveOffers 指令後將 TaskSet 中的 Tasks 根據相關規則均勻分配給Executor。
8)Executor:啓動一個 TaskRunner 執行一個 Task。

9.2 Code 轉化爲初始 RDDs

咱們的用戶代碼經過調用 Spark 的 Api(好比:SparkSession.builder.appName("Spark Pi").getOrCreate()),該 Api 會建立 Spark 的上下文(SparkContext),當咱們調用 transform 類方法(如:parallelize(),map())都會建立(或者裝飾已有的)Spark 數據結構(RDD),若是是 action 類操做(如:reduce()),那麼將最後封裝的 RDD 做爲一次 Job 提交,存入待調度隊列中(DAGSchedulerEventProcessLoop )待後續異步處理。
若是屢次調用 action 類操做,那麼封裝的多個 RDD 做爲多個 Job 提交。
流程以下:

詳解以下:

ExecuteEnv(執行環境)
1)這裏能夠是經過 spark-submit 提交的 MainClass,也能夠是 spark-shell 腳本。
2)MainClass:代碼中一定會建立或者獲取一個 SparkContext。
3)spark-shell:默認會建立一個 SparkContext。

RDD(彈性分佈式數據集)

1)create:能夠直接建立(如:sc.parallelize(1 until n, slices) ),也能夠在其餘地方讀取(如:sc.textFile("README.md"))等。
2)transformation:rdd 提供了一組 api 能夠進行對已有 RDD 進行反覆封裝成爲新的 RDD,這裏採用的是`裝飾者設計模式`,下面爲部分裝飾器類圖。
3)action:當調用 RDD 的 action 類操做方法時(collect、reduce、lookup、save ),這觸發 DAGScheduler 的 Job 提交。
4)DAGScheduler:建立一個名爲 JobSubmitted 的消息至 DAGSchedulerEventProcessLoop 阻塞消息隊列(LinkedBlockingDeque)中。
5)DAGSchedulerEventProcessLoop:啓動名爲【dag-scheduler-event-loop】的線程實時消費消息隊列。
6)【dag-scheduler-event-loop】處理完成後回調 JobWaiter。
7)DAGScheduler:打印 Job 執行結果。
8)JobSubmitted:相關代碼以下(其中 jobId 爲 DAGScheduler 全局遞增 Id)。
    eventProcessLoop.post(JobSubmitted(
            jobId, rdd, func2, partitions.toArray, callSite, waiter,
            SerializationUtils.clone(properties)))

部分裝飾器類圖


最終示例:

最終轉化的 RDD 分爲四層,每層都依賴於上層 RDD,將 ShffleRDD 封裝爲一個 Job 存入 DAGSchedulerEventProcessLoop 待處理,若是咱們的代碼中存在幾段上面示例代碼,那麼就會建立對應對的幾個 ShffleRDD 分別存入 DAGSchedulerEventProcessLoop 中。

9.3 RDD 分解爲待執行任務集合(TaskSet)

Job 提交後,DAGScheduler 根據 RDD 層次關係解析爲對應的 Stages,同時維護 Job 與 Stage 的關係。
將最上層的 Stage 根據併發關係(findMissingPartitions)分解爲多個 Task,將這個多個 Task 封裝爲 TaskSet 提交給 TaskScheduler。非最上層的 Stage 的存入處理的列表中(waitingStages += stage)
流程以下:

詳解以下:

1)DAGSchedulerEventProcessLoop中,線程【dag-scheduler-event-loop】處理到 JobSubmitted
2)調用 DAGScheduler 進行 handleJobSubmitted
    a) 首先根據 RDD 依賴關係依次建立 Stage 族,Stage 分爲 ShuffleMapStage、ResultStage 兩類,以下圖所示:
    b) 更新 jobId 與 StageId 關係 Map
    c) 建立 ActiveJob,調用 LiveListenerBug,發送 SparkListenerJobStart 指令
    d) 找到最上層 Stage 進行提交,下層 Stage 存入 waitingStage 中待後續處理
        1) 調用 OutputCommitCoordinator 進行 stageStart() 處理
        2) 調用 LiveListenerBug,發送 SparkListenerStageSubmitted 指令
        3) 調用 SparkContext的broadcast 方法獲取 Broadcast 對象,根據 Stage 類型建立對應多個 Task,一個 Stage 根據 findMissingPartitions 分爲多個對應的 Task,Task 分爲 ShuffleMapTask、ResultTask
        4) 將 Task 封裝爲 TaskSet,調用 TaskScheduler.submitTasks(taskSet) 進行 Task 調度,關鍵代碼以下:
            taskScheduler.submitTasks(new TaskSet(
                tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

ShuffleMapStage、ResultStage 兩類

9.4 TaskSet 封裝爲 TaskSetManager 並提交至 Driver

TaskScheduler 將 TaskSet 封裝爲 TaskSetManager(new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)),存入待處理任務池(Pool)中,發送 DriverEndpoint 喚起消費(ReviveOffers)指令。

詳解以下:

1)DAGSheduler 將 TaskSet 提交給 TaskScheduler 的實現類,這裏是 TaskChedulerImpl。
2)TaskSchedulerImpl 建立一個 TaskSetManager 管理 TaskSet,關鍵代碼以下:
    new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
3)同時將 TaskSetManager 添加 SchedduableBuilder 的任務池 Poll 中。
4)調用 SchedulerBackend 的實現類進行 reviveOffers,這裏是 standlone 模式的實現類 StandaloneSchedulerBackend。
5)SchedulerBackend 發送 ReviveOffers 指令至 DriverEndpoint。

9.5 Driver 將 TaskSetManager 分解爲 TaskDescriptions 併發布任務到 Executor

Driver 接受喚起消費指令後,將全部待處理的 TaskSetManager 與 Driver 中註冊的 Executor 資源進行匹配,最終一個 TaskSetManager 獲得多個 TaskDescription 對象,按照 TaskDescription 相對應的 Executor 發送 LaunchTask 指令。

詳解以下:

當 Driver 獲取到 ReviveOffers(請求消費)指令時
1)首先根據 executorDataMap 緩存信息獲得可用的 Executor 資源信息(WorkerOffer),關鍵代碼以下:
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq

2)接着調用 TaskScheduler 進行資源匹配,方法定義以下:
    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] synchronized {..}
    a) 將 WorkerOffer 資源打亂,如:val shuffledOffers = Random.shuffle(offers)
    b) 將 Pool 中待處理的 TaskSetManager 取出,如:val sortedTaskSets = rootPool.getSortedTaskSetQueue
    c) 並循環處理 sortedTaskSets 並與 shuffledOffers 循環匹配,若是 shuffledOffers(i) 有足夠的 CPU 資源( if (availableCpus(i) >= CPUS_PER_TASK)),調用 TaskSetManager 建立 TaskDescription 對象(taskSet.resourceOffer(execId, host, maxLocality)),最終建立了多個 TaskDescription,TaskDescription 定義以下:
        new TaskDescription(
            taskId,
            attemptNum,
            execId,
            taskName,
            index,
            sched.sc.addedFiles,
            sched.sc.addedJars,
            task.localProperties,
            serializedTask)

3)若是 TaskDescriptions 不爲空,循環 TaskDescriptions,序列化 TaskDescription 對象,並向 ExecutorEndpoint 發送 LaunchTask 指令,關鍵代碼以下:
    for (task <- taskDescriptions.flatten) {
            val serializedTask = TaskDescription.encode(task)
            val executorData = executorDataMap(task.executorId)
            executorData.freeCores -= scheduler.CPUS_PER_TASK
            executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }

第10章 Task 執行和回饋

DriverEndpoint 最終生成多個可執行的 TaskDescription 對象,並向各個 ExecutorEndpoint 發送 LaunchTask 指令,本節內容將關注 ExecutorEndpoint 如何處理 LaunchTask 指令,處理完成後如何回饋給 DriverEndpoint,以及整個 job 最終如何屢次調度直至結束。

10.1 Task 的執行流程

Executor 接受 LaunchTask 指令後,開啓一個新線程 TaskRunner 解析 RDD,並調用 RDD 的 compute 方法,歸併函數獲得最終任務執行結果。

詳解以下:

1)ExecutorEndpoint 接受到 LaunchTask 指令後,解碼出 TaskDescription,調用 Executor 的 launchTask 方法。
2)Executor 建立一個 TaskRunner 線程,並啓動線程,同時將改線程添加到 Executor 的成員對象中,代碼以下:
    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
    runningTasks.put(taskDescription.taskId, taskRunner)

TaskRunner
1)首先向 DriverEndpoint 發送任務最新狀態爲 RUNNING。
2)從 TaskDescription 解析出 Task,並調用 Task 的 run 方法。

Task
1)建立 TaskContext 以及 CallerContext (與 HDFS 交互的上下文對象)。
2)執行 Task 的 runTask 方法:
    a) 若是 Task 實例爲 ShuffleMapTask:解析出 RDD 以及 ShuffleDependency 信息,調用 RDD 的 compute() 方法將結果寫 Writer 中(Writer 這裏不介紹,能夠做爲黑盒理解,好比寫入一個文件中),返回 MapStatus 對象。
    b) 若是 Task 實例爲 ResultTask:解析出 RDD 以及合併函數信息,調用函數將調用後的結果返回。

TaskRunner 將 Task 執行的結果序列化,再次向 DriverEndpoint 發送任務最新狀態爲 FINISHED。

10.2 Task 的回饋流程

TaskRunner 執行結束後,都將執行狀態發送至 DriverEndpoint,DriverEndpoint 最終反饋指令 CompletionEvent 發送至 DAGSchedulerEventProcessLoop 中。

詳解以下:

1)DriverEndpoint 接收到 StatusUpdate 消息後,調用 TaskScheduler 的 statusUpdate(taskId, state, result) 方法
2)TaskScheduler 若是任務結果是完成,那麼清除該任務處理中的狀態,並調動 TaskResultGetter 相關方法,關鍵代碼以下:
    val taskSet = taskIdToTaskSetManager.get(tid)

    taskIdToTaskSetManager.remove(tid)
            taskIdToExecutorId.remove(tid).foreach { executorId =>
        executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
    }
    taskSet.removeRunningTask(tid)
    if (state == TaskState.FINISHED) {
        taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
    } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
        taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
    }

TaskResultGetter 啓動線程啓動線程【task-result-getter】進行相關處理:
1)經過解析或者遠程獲取獲得 Task 的 TaskResult 對象。
2)調用 TaskSet 的 handleSuccessfulTask 方法,TaskSet 的 handleSuccessfulTask 方法直接調用 TaskSetManager 的 handleSuccessfulTask 方法。

TaskSetManager
1)更新內部 TaskInfo 對象狀態,並將該 Task 從運行中 Task 的集合刪除,代碼以下:
    val info = taskInfos(tid)
    info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
    removeRunningTask(tid)
2)調用 DAGScheduler 的 taskEnded 方法,關鍵代碼以下:
    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)

DAGScheduler 向 DAGSchedulerEventProcessLoop 存入 CompletionEvent 指令,CompletionEvent 對象定義以下:
    private[scheduler] case class CompletionEvent(
        task: Task[_],
        reason: TaskEndReason,
        result: Any,
        accumUpdates: Seq[AccumulatorV2[_, _]],
        taskInfo: TaskInfo)
 extends DAGSchedulerEvent

10.3 Task 的迭代流程

DAGSchedulerEventProcessLoop 中針對於 CompletionEvent 指令,調用 DAGScheduler 進行處理,DAGScheduler 更新 Stage 與該 Task 的關係狀態,若是 Stage 下 Task 都返回,則作下一層 Stage 的任務拆解與運算工做,直至 Job 被執行完畢:

詳解以下:

1)DAGSchedulerEventProcessLoop 接收到 CompletionEvent 指令後,調用 DAGScheduler 的 handleTaskCompletion 方法。
2)DAGScheduler 根據 Task 的類型分別處理。
3)若是 Task 爲 ShuffleMapTask
    a) 等待回饋的 Partitions 減去當前 partitionId
    b) 若是全部 task 都返回,則 markStageAsFinished(shuffleStage),同時向 MapOutputTrackerMaster 註冊 MapOutputs 信息,且 markMapStageJobAsFinished
    c) 調用 submitWaitingChildStages(shuffleStage) 進行下層 Stages 的處理,從而迭代處理,最終處理到 ResultTask,job 結束,關鍵代碼以下:
        private def submitWaitingChildStages(parent: Stage) {
            ...
            val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
            waitingStages --= childStages
            for (stage <- childStages.sortBy(_.firstJobId)
{
                submitStage(stage)
            }
        }
4)若是 Task 爲 ResultTask
    a) 該 job 的 partitions 都已返回,則 markStageAsFinished(resultStage),並 cleanupStateForJobAndIndependentStages(job),關鍵代碼以下:
        for (stage <- stageIdToStage.get(stageId)) {
            if (runningStages.contains(stage)) {
                logDebug("Removing running stage %d".format(stageId))
                runningStages -= stage
            }
            for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
                shuffleIdToMapStage.remove(k)
            }
            if (waitingStages.contains(stage)) {
                logDebug("Removing stage %d from waiting set.".format(stageId))
                waitingStages -= stage
            }
            if (failedStages.contains(stage)) {
                logDebug("Removing stage %d from failed set.".format(stageId))
                failedStages -= stage
            }
        }
        // data structures based on StageId
        stageIdToStage -= stageId
        jobIdToStageIds -= job.jobId
        jobIdToActiveJob -= job.jobId
        activeJobs -= job

至此,用戶編寫的代碼最終調用 Spark 分佈式計算完畢。

10.4 精彩圖解

Spark的交互流程 – 節點啓動


Spark的交互流程 – 應用提交

Spark的交互流程 – 任務運行

Spark的交互流程 – 任務運行

第11章 Spark 的數據存儲

Spark 計算速度遠勝於 Hadoop 的緣由之一就在於中間結果是緩存在內存而不是直接寫入到 disk,本文嘗試分析 Spark 中存儲子系統的構成,並以數據寫入和數據讀取爲例,講述清楚存儲子系統中各部件的交互關係。

11.1 存儲子系統概覽

Storage 模塊主要分爲兩層:
  1) 通訊層:storage 模塊採用的是 master-slave 結構來實現通訊層,master 和 slave 之間傳輸控制信息、狀態信息,這些都是經過通訊層來實現的。
  2) 存儲層:storage 模塊須要把數據存儲到 disk 或是 memory 上面,有可能還需 replicate(複製) 到遠端,這都是由存儲層來實現和提供相應接口。
而其餘模塊若要和 storage 模塊進行交互,storage 模塊提供了統一的操做類 BlockManager,外部類與 storage 模塊打交道都須要經過調用 BlockManager 相應接口來實現。


上圖是Spark存儲子系統中幾個主要模塊的關係示意圖,現簡要說明以下:
1)CacheManager         RDD 在進行計算的時候,經過 CacheManager 來獲取數據,並經過 CacheManager 來存儲計算結果。
2)BlockManager         CacheManager 在進行數據讀取和存取的時候主要是依賴 BlockManager 接口來操做,BlockManager 決定數據是從內存(MemoryStore) 仍是從磁盤(DiskStore) 中獲取。
3)MemoryStore          負責將數據保存在內存或從內存讀取。
4)DiskStore            負責將數據寫入磁盤或從磁盤讀入。
5)BlockManagerWorker   數據寫入本地的 MemoryStore 或 DiskStore 是一個同步操做,爲了容錯還須要將數據複製到別的計算結點,以防止數據丟失的時候還可以恢復,數據複製的操做是異步完成,由 BlockManagerWorker 來處理這一部分事情。
6)ConnectionManager    負責與其它計算結點創建鏈接,並負責數據的發送和接收。
7)BlockManagerMaster   注意該模塊只運行在 Driver Application 所在的 Executor,功能是負責記錄下全部 BlockIds 存儲在哪一個 SlaveWorker 上,好比 RDD Task 運行在機器 A,所須要的 BlockId 爲 3,但在機器 A 上沒有 BlockId 爲 3 的數值,這個時候 Slave worker 須要經過 BlockManager 向 BlockManagerMaster 詢問數據存儲的位置,而後再經過 ConnectionManager 去獲取。

11.2 啓動過程分析

上述的各個模塊由 SparkEnv 來建立,建立過程在 SparkEnv.create 中完成,代碼以下:

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
        "BlockManagerMaster",
        new BlockManagerMasterActor(isLocal, conf)), conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)

val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)

下面這段代碼容易讓人疑惑,看起來像是在全部的 cluster node 上都建立了 BlockManagerMasterActor,其實否則,仔細看 registerOrLookup 函數的實現。若是當前節點是 driver 則建立這個 actor,不然創建到 driver 的鏈接。代碼以下:

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
    if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Props(newActor), name = name)
    } else {
        val driverHost: String = conf.get("spark.driver.host""localhost")
        val driverPort: Int = conf.getInt("spark.driver.port"7077)
        Utils.checkHost(driverHost, "Expected hostname")
        val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
        val timeout = AkkaUtils.lookupTimeout(conf)
        logInfo(s"Connecting to $name: $url")
        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
    }
}

初始化過程當中一個主要的動做就是 BlockManager 須要向 BlockManagerMaster 發起註冊。

11.3 通訊層


BlockManager 包裝了 BlockManagerMaster,發送信息包裝成 BlockManagerInfo。Spark 在 Driver 和 Worker 端都建立各自的 BlockManager,並經過 BlockManagerMaster 進行通訊,經過 BlockManager 對 Storage 模塊進行操做。
BlockManager 對象在 SparkEnv.create 函數中進行建立,代碼以下:
def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint)
:
RpcEndpointRef 
= {
    if (isDriver) {
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)
    } else {
        RpcUtils.makeDriverRef(name, conf, rpcEnv)
    }
}
......
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
        BlockManagerMaster.DRIVER_ENDPOINT_NAME,
        new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
        conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
        serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,numUsableCores)

而且在建立以前對當前節點是不是 Driver 進行了判斷。若是是,則建立這個 Endpoint;不然,建立 Driver 的鏈接。

在建立 BlockManager 以後,BlockManager 會調用 initialize 方法初始化本身。而且初始化的時候,會調用 BlockManagerMaster 向 Driver 註冊本身,同時,在註冊時也啓動了Slave Endpoint。另外,向本地 shuffle 服務器註冊 Executor 配置,若是存在的話。代碼以下:

def initialize(appId: String): Unit = {
......
    master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

    // Register Executors' configuration with the local shuffle service, if one should exist.
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
        registerWithExternalShuffleServer()
    }
}

而 BlockManagerMaster 將註冊請求包裝成 RegisterBlockManager 註冊到 Driver。Driver 的 BlockManagerMasterEndpoint 會調用 register 方法,經過對消息 BlockManagerInfo 檢查,向 Driver 註冊,代碼以下:

private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
    val time = System.currentTimeMillis()
    if (!blockManagerInfo.contains(id)) {
        blockManagerIdByExecutor.get(id.executorId) match {
            case Some(oldId) =>
                // A block manager of the same executor already exists, so remove it (assumed dead)
                logError("Got two different block manager registrations on same executor - "
                        + s" will replace old one $oldId with new one $id")
                removeExecutor(id.executorId)
            case None =>
        }
        logInfo("Registering block manager %s with %s RAM, %s".format(
                id.hostPort, Utils.bytesToString(maxMemSize), id))

        blockManagerIdByExecutor(id.executorId) = id

        blockManagerInfo(id) 
new BlockManagerInfo(
                id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
    }
    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}

不難發現 BlockManagerInfo 對象被保存到 Map 映射中。在通訊層中 BlockManagerMaster 控制着消息的流向,這裏採用了模式匹配,全部的消息模式都在 BlockManagerMessage 中。

11.4 存儲層


Spark Storage 的最小存儲單位是 block,全部的操做都是以 block 爲單位進行的。
在 BlockManager 被建立的時候 MemoryStore 和 DiskStore 對象就被建立出來了。代碼以下:
val diskBlockManager = new DiskBlockManager(this, conf)
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)

11.4.1 Disk Store

因爲當前的 Spark 版本對 Disk Store 進行了更細粒度的分工,把對文件的操做提取出來放到了 DiskBlockManager 中,DiskStore 僅僅負責數據的存儲和讀取。
Disk Store 會配置多個文件目錄,Spark 會在不一樣的文件目錄下建立文件夾,其中文件夾的命名方式是:spark-UUID(隨機UUID碼)。Disk Store 在存儲的時候建立文件夾。而且根據【高內聚,低耦合】原則,這種服務型的工具代碼就放到了 Utils 中(調用路徑:DiskStore.putBytes —> DiskBlockManager.createLocalDirs —> Utils.createDirectory),代碼以下:

def createDirectory(root: String, namePrefix: String = "spark"): File = {
    var attempts = 0
    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
    var dir: File = null
    while (dir == null) 
{
        attempts += 1
        if (attempts > maxAttempts) {
            throw new IOException("Failed to create a temp directory (under " + root + ") after " +
                    maxAttempts + " attempts!")
        }
        try {
            dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
            if (dir.exists() || !dir.mkdirs()) {
                dir = null
            }
        } catch { case e: SecurityException => dir = null; }
    }

    dir.getCanonicalFile
}

在 DiskBlockManager 裏,每一個 block 都被存儲爲一個 file,經過計算 blockId 的 hash 值,將 block 映射到文件中。

def getFile(filename: String): File = {
    // Figure out which local directory it hashes to, and which subdirectory in that
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    val subDir = subDirs(dirId).synchronized {
        val old = subDirs(dirId)(subDirId)
        if (old != null) {
            old
        } else {
            val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
            if (!newDir.exists() && !newDir.mkdir()) {
                throw new IOException(s"Failed to create local dir in $newDir.")
            }
            subDirs(dirId)(subDirId) = newDir
            newDir
        }
    }

    new File(subDir, filename)
}

def getFile(blockId: BlockId): File = getFile(blockId.name)

經過 hash 值的取模運算,求出 dirId 和 subDirId。而後,在從 subDirs 中找到 subDir,若是 subDir 不存在,則建立一個新 subDir。最後,以 subDir 爲路徑,blockId 的 name 屬性爲文件名,新建該文件。
文件建立完以後,那麼 Spark 就會在 DiskStore 中向文件寫與之映射的 block,代碼以下:

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
    val bytes = _bytes.duplicate()
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val channel = new FileOutputStream(file).getChannel
    Utils.tryWithSafeFinally {
        while (bytes.remaining > 0) {
            channel.write(bytes)
        }
    } {
        channel.close()
    }
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
            file.getName, Utils.bytesToString(bytes.limit)
, finishTime - startTime))
    PutResult(bytes.limit()Right(bytes.duplicate()))
}

讀取過程就簡單了,DiskStore 根據 blockId 讀取與之映射的 file 內容,固然,這中間須要從 DiskBlockManager 中獲得文件信息。代碼以下:

private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
    val channel = new RandomAccessFile(file, "r").getChannel
    Utils.tryWithSafeFinally {
        // For small files, directly read rather than memory map
        if (length < minMemoryMapBytes) {
            val buf = ByteBuffer.allocate(length.toInt)
            channel.position(offset)
            while (buf.remaining() != 0) {
                if (channel.read(buf) == -1) {
                    throw new IOException("Reached EOF before filling buffer\n" +
                            s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
                }
            }
            buf.flip()
            Some(buf)
        } else {
            Some(channel.map(MapMode.READ_ONLY, offset, length))
        }
    } {
        channel.close()
    }
}

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
    val file = diskManager.getFile(blockId.name)
    getBytes(file, 0, file.length)
}

11.4.2 Memory Store

相對 Disk Store,Memory Store 就顯得容易不少。Memory Store 用一個 LinkedHashMap 來管理,其中 Key 是 blockId,Value 是 MemoryEntry 樣例類,MemoryEntry 存儲着數據信息。代碼以下:

private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
private val entries 
new LinkedHashMap[BlockId, MemoryEntry](320.75ftrue)

在 MemoryStore 中存儲 block 的前提是當前內存有足夠的空間存放。經過對 tryToPut 函數的調用對內存空間進行判斷。代碼以下:

def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
    // Work on a duplicate - since the original input might be used elsewhere.
    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
    val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
    val data =
    if (putAttempt.success) {
        assert(bytes.limit == size)
        Right(bytes.duplicate())
    } else {
        null
    }
    PutResult(size, data, putAttempt.droppedBlocks)
}

在 tryToPut 函數中,經過調用 enoughFreeSpace 函數判斷內存空間。若是內存空間足夠,那麼就把 block 放到 LinkedHashMap 中;若是內存不足,那麼就告訴 BlockManager 內存不足,若是容許 Disk Store,那麼就把該 block 放到 disk 上。代碼以下:

private def tryToPut(blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = {
    var putSuccess = false
    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

    accountingLock.synchronized {
        val freeSpaceResult = ensureFreeSpace(blockId, size)
        val enoughFreeSpace = freeSpaceResult.success
        droppedBlocks ++= freeSpaceResult.droppedBlocks

        if (enoughFreeSpace) 
{
            val entry = new MemoryEntry(value(), size, deserialized)
            entries.synchronized {
                entries.put(blockId, entry)
                currentMemory += size
            }
            val valuesOrBytes = if (deserialized) "values" else "bytes"
            logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
                    blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
            putSuccess = true
        } else {
            lazy val data = if (deserialized) {
                Left(value().asInstanceOf[Array[Any]])
            } else {
                Right(value().asInstanceOf[ByteBuffer].duplicate())
            }
            val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
            droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
        }
        releasePendingUnrollMemoryForThisTask()
    }
    ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}

Memory Store 讀取 block 也很簡單,只須要從 LinkedHashMap 中取出 blockId 的 Value 便可。代碼以下:

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
    val entry = entries.synchronized {
        entries.get(blockId)
    }
    if (entry == null) {
        None
    } else if (entry.deserialized) {
        Some(entry.value.asInstanceOf[Array[Any]].iterator)
    } else {
        val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
        Some(blockManager.dataDeserialize(blockId, buffer))
    }
}

11.5 數據寫入過程分析


數據寫入的簡要流程:
1)RDD.iterator 是與 storage 子系統交互的入口。
2)CacheManager.getOrCompute 調用 BlockManager 的 put 接口來寫入數據。
3)數據優先寫入到 MemoryStore 即內存,若是 MemoryStore 中的數據已滿則將最近使用次數不頻繁的數據寫入到磁盤。
4)通知 BlockManagerMaster 有新的數據寫入,在 BlockManagerMaster 中保存元數據。
5)將寫入的數據與其它 slave worker 進行同步,通常來講在本機寫入的數據,都會另先一臺機器來進行數據的備份,即 replicanumber=1
其實,咱們在 put 和 get block 的時候並無那麼複雜,前面的細節 BlockManager 都包裝好了,咱們只須要調用 BlockManager 中的 put 和 get 函數便可。

代碼以下:

def putBytes(
           blockId: BlockId,
           bytes: ByteBuffer,
           level: StorageLevel,
           tellMaster: Boolean = true,
           effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)
= {
       require(bytes != null"Bytes is null")
       doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
   }
   private def doPut(
           blockId: BlockId,
           data: BlockValues,
           level: StorageLevel,
           tellMaster: Boolean = true,
           effectiveStorageLevel: Option[StorageLevel] = None)

: Seq[(BlockId, BlockStatus)
= {

       require(blockId != null"BlockId is null")
       require(level != null && level.isValid, "StorageLevel is null or invalid")
       effectiveStorageLevel.foreach { level =>
           require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
       }

       val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

       val putBlockInfo = {
               val tinfo = new BlockInfo(level, tellMaster)
               val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
       if (oldBlockOpt.isDefined) {
           if (oldBlockOpt.get.waitForReady()) {
               logWarning(s"Block $blockId already exists on this machine; not re-adding it")
               return updatedBlocks
           }
           oldBlockOpt.get
       } else {
           tinfo
       }
}

       val startTimeMs = System.currentTimeMillis

       var valuesAfterPut: Iterator[Any] = null

       var bytesAfterPut: ByteBuffer = null

       var size = 0L

       val putLevel = effectiveStorageLevel.getOrElse(level)

       val replicationFuture = data match {
           case b: ByteBufferValues if putLevel.replication > 1 =>
               // Duplicate doesn't copy the bytes, but just creates a wrapper
               val bufferView = b.buffer.duplicate()
               Future {
               replicate(blockId, bufferView, putLevel)
           }(futureExecutionContext)
           case _ => null
       }

       putBlockInfo.synchronized {
           logTrace("Put for block %s took %s to get into synchronized block"
                   .format(blockId, Utils.getUsedTimeMs(startTimeMs)))

           var marked = false
           try {
               val (returnValues, blockStore: BlockStore) = {
                   if (putLevel.useMemory) {
                       (true, memoryStore)
                   } else if (putLevel.useOffHeap) {
                       (false, externalBlockStore)
                   } else if (putLevel.useDisk) {
                       (putLevel.replication > 1, diskStore)
                   } else {
                       assert(putLevel == StorageLevel.NONE)
                       throw new BlockException(
                               blockId, s"Attempted to put block $blockId without specifying storage level!")
                   }
               }

               val result = data match {
                   case IteratorValues(iterator) =>
                       blockStore.putIterator(blockId, iterator, putLevel, returnValues)
                   case ArrayValues(array) =>
                       blockStore.putArray(blockId, array, putLevel, returnValues)
                   case ByteBufferValues(bytes) =>
                       bytes.rewind()
                       blockStore.putBytes(blockId, bytes, putLevel)
               }
               size = result.size
               result.data match {
                   case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
                   case Right (newBytes) 
=> bytesAfterPut = newBytes
                   case _ =>
               }

               if (putLevel.useMemory) {
                   result.droppedBlocks.foreach { updatedBlocks += _ }
               }

               val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
               if (putBlockStatus.storageLevel != StorageLevel.NONE) {
                   marked = true
                   putBlockInfo.markReady(size)
                   if (tellMaster) {
                       reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
                   }
                   updatedBlocks += ((blockId, putBlockStatus))
               }
           } finally {
               if (!marked) {
                   blockInfo.remove(blockId)
                   putBlockInfo.markFailure()
                   logWarning(s"Putting block $blockId failed")
               }
           }
       }
       logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))

       if (putLevel.replication > 1) {
           data match {
               case ByteBufferValues(bytes) =>
                   if (replicationFuture != null) {
                       Await.ready(replicationFuture, Duration.Inf)
                   }
               case _ =>
                   val remoteStartTime = System.currentTimeMillis
                   if (bytesAfterPut == null) 
{
                       if (valuesAfterPut == null) {
                           throw new SparkException(
                                   "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
                       }
                       bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
                   }
                   replicate(blockId, bytesAfterPut, putLevel)
                   logDebug("Put block %s remotely took %s"
                           .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
           }
       }

       BlockManager.dispose(bytesAfterPut)

       if (putLevel.replication > 1) {
           logDebug("Putting block %s with replication took %s"
                   .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
       } else {
           logDebug("Putting block %s without replication took %s"
                   .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
       }

       updatedBlocks
   }

對於 doPut 函數,主要作了如下幾個操做:
  1)建立 BlockInfo 對象存儲 block 信息。
  2)將 BlockInfo 加鎖,而後根據 Storage Level 判斷存儲到 Memory 仍是 Disk。同時,對於已經準備好讀的 BlockInfo 要進行解鎖。
  3)根據 block 的副本數量決定是否向遠程發送副本。

11.5.1 序列化與否

寫入的具體內容能夠是序列化以後的 bytes 也能夠是沒有序列化的 value. 此處有一個對 scala 的語法中 Either, Left, Right 關鍵字的理解。

11.6 數據讀取過程分析

def get(blockId: BlockId): Option[Iterator[Any]] = {
    val local = getLocal(blockId)
    if (local.isDefined) {
        logInfo("Found block %s locally".format(blockId))
        return local
    }
    val remote = getRemote(blockId)
    if (remote.isDefined) {
        logInfo("Found block %s remotely".format(blockId))
        return remote
    }
    None
}

11.6.1 本地讀取

首先在查詢本機的 MemoryStore 和 DiskStore 中是否有所須要的 block 數據存在,若是沒有則發起遠程數據獲取。

11.6.2 遠程讀取

遠程獲取調用路徑, getRemote --> doGetRemote, 在 doGetRemote 中最主要的就是調用 BlockManagerWorker.syncGetBlock 來從遠程得到數據。

def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
    val blockManager = blockManagerWorker.blockManager
    val connectionManager = blockManager.connectionManager
    val blockMessage = BlockMessage.fromGetBlock(msg)
    val blockMessageArray = new BlockMessageArray(blockMessage)
    val responseMessage = connectionManager.sendMessageReliablySync(
            toConnManagerId, blockMessageArray.toBufferMessage)
    responseMessage match {
        case Some(message) => {
            val bufferMessage = message.asInstanceOf[BufferMessage]
            logDebug("Response message received " + bufferMessage)
            BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
                    logDebug("Found " + blockMessage)
            return blockMessage.getData
      })
        }
        case None => logDebug("No response message received")
    }
    null
}

上述這段代碼中最有意思的莫過於 sendMessageReliablySync,遠程數據讀取毫無疑問是一個異步 i/o 操做,這裏的代碼怎麼寫起來就像是在進行同步的操做同樣呢。也就是說如何知道對方發送回來響應的呢?
別急,繼續去看看 sendMessageReliablySync 的定義:

def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
  : Future[Option[Message]] 
= {
    val promise = Promise[Option[Message]]
    val status = new MessageStatus(
            message, connectionManagerId, s => promise.success(s.ackMessage))
    messageStatuses.synchronized {
        messageStatuses += ((message.id, status))
    }
    sendMessage(connectionManagerId, message)
    promise.future
}

要是我說祕密在這裏,你確定會說我在扯淡,但確實在此處。注意到關鍵字 Promise 和 Future 沒?
若是這個 future 執行完畢,返回 s.ackMessage。咱們再看看這個 ackMessage 是在什麼地方被寫入的呢。看一看 ConnectionManager.handleMessage 中的代碼片斷:

case bufferMessage: BufferMessage =>

{
    if (authEnabled) {
        val res = handleAuthentication(connection, bufferMessage)
        if (res == true) {
            // message was security negotiation so skip the rest
            logDebug("After handleAuth result was true, returning")
            return
        }
    }
    if (bufferMessage.hasAckId) {
        val sentMessageStatus = messageStatuses. synchronized {
            messageStatuses.get(bufferMessage.ackId) match {
                case Some(status) =>{
                    messageStatuses -= bufferMessage.ackId
                    status
                }
                case None =>{
                    throw new Exception("Could not find reference for received ack message " +
                            message.id)
                    null
                }
            }
        }
        sentMessageStatus. synchronized {
            sentMessageStatus.ackMessage = Some(message)
            sentMessageStatus.attempted = true
            sentMessageStatus.acked = true
            sentMessageStaus.markDone()
        }
    }
}

注意:此處的所調用的 sentMessageStatus.markDone 就會調用在 sendMessageReliablySync 中定義的 promise.Success,不妨看看 MessageStatus 的定義。

class MessageStatus(
val messageMessage,
val connectionManagerIdConnectionManagerId,
completionHandlerMessageStatus 
=> Unit) {

    var ackMessage: Option[Message] = None
    var attempted = false
    var acked = false

    def markDone() 
{ completionHandler(this) }
}

11.7 Partition 如何轉化爲 Block

在 storage 模塊裏面全部的操做都是和 block 相關的,可是在 RDD 裏面全部的運算都是基於 partition 的,那麼 partition 是如何與 block 對應上的呢?
RDD 計算的核心函數是 iterator() 函數:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
        computeOrReadCheckpoint(split, context)
    }
}

若是當前 RDD 的 storage level 不是 NONE 的話,表示該 RDD 在 BlockManager 中有存儲,那麼調用 CacheManager 中的 getOrCompute() 函數計算 RDD,在這個函數中 partition 和 block 發生了關係:
首先根據 RDD id 和 partition index 構造出 block id (rdd_xx_xx),接着從 BlockManager 中取出相應的 block。
  若是該 block 存在,表示此 RDD 在以前已經被計算過和存儲在 BlockManager 中,所以取出便可,無需再從新計算。
  若是該 block 不存在則須要調用 RDD 的 computeOrReadCheckpoint() 函數計算出新的 block,並將其存儲到 BlockManager 中。
須要注意的是 block 的計算和存儲是阻塞的,若另外一線程也須要用到此 block 則需等到該線程 block 的 loading 結束。

def getOrCompute[T](rdd:RDD[T],split:Partition,context:TaskContext,storageLevel:StorageLevel):Iterator[T]=
{
    val key = "rdd_%d_%d".format(rdd.id, split.index)
    logDebug("Looking for partition " + key)
    blockManager.get(key) match {
    case Some(values) =>
        // Partition is already materialized, so just return its values
        return values.asInstanceOf[Iterator[T]]

    case None =>
        // Mark the split as loading (unless someone else marks it first)
        loading. synchronized {
        if (loading.contains(key)) {
            logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
            while (loading.contains(key)) {
                try {
                    loading.wait()
                } catch {
                    case _:
                        Throwable =>}
            }
            logInfo("Finished waiting for %s".format(key))
            // See whether someone else has successfully loaded it. The main way this would fail
            // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
            // partition but we didn't want to make space for it. However, that case is unlikely
            // because it's unlikely that two threads would work on the same RDD partition. One
            // downside of the current code is that threads wait serially if this does happen.
            blockManager.get(key) match {
                case Some(values) =>
                    return values.asInstanceOf[Iterator[T]]
                case None =>
                    logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
                    loading.add(key)
            }
        } else {
            loading.add(key)
        }
    }
    try {
        // If we got here, we have to load the split
        logInfo("Partition %s not found, computing it".format(key))
        val computedValues = rdd.computeOrReadCheckpoint(split, context)
        // Persist the result, so long as the task is not running locally
        if (context.runningLocally) {
            return computedValues
        }
        val elements = new ArrayBuffer[Any]
        elements++ = computedValues
        blockManager.put(key, elements, storageLevel, true)
        return elements.iterator.asInstanceOf[Iterator[T]]
    } finally {
        loading. synchronized {
            loading.remove(key)
            loading.notifyAll()
        }
    }
}

這樣 RDD 的 transformation、action 就和 block 數據創建了聯繫,雖然抽象上咱們的操做是在 partition 層面上進行的,可是 partitio n最終仍是被映射成爲 block,所以實際上咱們的全部操做都是對 block 的處理和存取。

11.8 partition 和 block 的對應關係

在 RDD 中,核心的函數是 iterator:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
        computeOrReadCheckpoint(split, context)
    }
}

若是當前 RDD 的 storage level 不是 NONE 的話,表示該 RDD 在 BlockManager 中有存儲,那麼調用 CacheManager 中的 getOrCompute 函數計算 RDD,在這個函數中 partition 和 block 就對應起來了:
  getOrCompute 函數會先構造 RDDBlockId,其中 RDDBlockId 就把 block 和 partition 聯繫起來了,RDDBlockId 產生的 name 就是 BlockId 的 name 屬性,形式是:rdd_rdd.id_partition.index。

def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

    val key = RDDBlockId(rdd.id, partition.index)
    logDebug(s"Looking for partition $key")
    blockManager.get(key) match {
        case Some(blockResult) =>
            val existingMetrics = context.taskMetrics
                    .getInputMetricsForReadMethod(blockResult.readMethod)
            existingMetrics.incBytesRead(blockResult.bytes)

            val iter = blockResult.data.asInstanceOf[Iterator[T]]
            new InterruptibleIterator[T](context, iter) {
            override def next(): T = {
                    existingMetrics.incRecordsRead(1)
                    delegate.next()
            }
        }
        case None =>
            val storedValues = acquireLockForPartition[T](key)
            if (storedValues.isDefined) {
                return new InterruptibleIterator[T](context, storedValues.get)
            }

            try {
                logInfo(s"Partition $key not found, computing it")
                val computedValues = rdd.computeOrReadCheckpoint(partition, context)

                if (context.isRunningLocally) {
                    return computedValues
                }

                val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
                val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
                val metrics = context.taskMetrics
                val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
                metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
                new InterruptibleIterator(context, cachedValues)

            } finally {
                loading.synchronized {
                    loading.remove(key)
                    loading.notifyAll()
                }
            }
    }
}

同時 getOrCompute 函數會對 block 進行判斷:
  若是該 block 存在,表示此 RDD 在以前已經被計算過和存儲在 BlockManager 中,所以取出便可,無需再從新計算。
  若是該 block 不存在則須要調用 RDD 的 computeOrReadCheckpoint() 函數計算出新的block,並將其存儲到 BlockManager 中。
  須要注意的是 block 的計算和存儲是阻塞的,若另外一線程也須要用到此 block 則需等到該線程 block 的 loading 結束。

第12章 Spark Shuffle 過程

12.1 MapReduce 的 Shuffle 過程介紹

  Shuffle 的本義是洗牌、混洗,把一組有必定規則的數據儘可能轉換成一組無規則的數據,越隨機越好。MapReduce 中的 Shuffle 更像是洗牌的逆過程,把一組無規則的數據儘可能轉換成一組具備必定規則的數據。
  爲何 MapReduce 計算模型須要 Shuffle 過程?咱們都知道 MapReduce 計算模型通常包括兩個重要的階段:Map 是映射,負責數據的過濾分發;Reduce 是規約,負責數據的計算歸併。Reduce 的數據來源於 Map,Map 的輸出便是 Reduce 的輸入,Reduce 須要經過 Shuffle來 獲取數據。
  從 Map 輸出到 Reduce 輸入的整個過程能夠廣義地稱爲 Shuffle。Shuffle 橫跨 Map 端和 Reduce 端,在 Map 端包括 Spill 過程,在 Reduce 端包括 copy 和 sort 過程,如圖所示:
  

12.1.1 Spill 過程(刷寫過程)

  Spill 過程包括輸出、排序、溢寫、合併等步驟,如圖所示:
  

Collect

  每一個 Map 任務不斷地以 <key, value> 對的形式把數據輸出到內存中構造的一個環形數據結構中。使用環形數據結構是爲了更有效地使用內存空間,在內存中放置儘量多的數據。
  這個數據結構其實就是個字節數組,叫 kvbuffer,名如其義,可是這裏面不光放置了 <key, value> 數據,還放置了一些索引數據,給放置索引數據的區域起了一個 kvmeta 的別名,在 kvbuffer 的一塊區域上穿了一個 IntBuffer(字節序採用的是平臺自身的字節序)的馬甲。<key, value> 數據區域和索引數據區域在 kvbuffer 中是相鄰不重疊的兩個區域,用一個分界點來劃分二者,分界點不是亙古不變的,而是每次 Spill 以後都會更新一次。初始的分界點是 0,<key, value> 數據的存儲方向是向上增加,索引數據的存儲方向是向下增加,如圖所示:
  

  kvbuffer 的存放指針 bufindex 是一直悶着頭地向上增加,好比 bufindex 初始值爲 0,一個 Int 型的 key 寫完以後,bufindex 增加爲 4,一個 Int 型的 value 寫完以後,bufindex 增加爲 8。
  索引是對 <key, value> 在 kvbuffer 中的索引,是個四元組,包括:value 的起始位置、key 的起始位置、partition 值、value 的長度,佔用四個 Int 長度,kvmeta 的存放指針 kvindex 每次都是向下跳四個「格子」,而後再向上一個格子一個格子地填充四元組的數據。好比 Kvindex 初始位置是 -4,當第一個 <key, value> 寫完以後,(kvindex+0) 的位置存放 value 的起始位置、(kvindex+1) 的位置存放 key 的起始位置、(kvindex+2) 的位置存放 partition 的值、(kvindex+3) 的位置存放 value 的長度,而後 kvindex 跳到 -8 位置,等第二個 <key, value> 和索引寫完以後,kvindex 跳到-32 位置。

  kvbuffer 的大小雖然能夠經過參數設置,可是總共就那麼大,<key, value> 和索引不斷地增長,加着加着,kvbuffer 總有不夠用的那天,那怎麼辦?把數據從內存刷到磁盤上再接着往內存寫數據,把 kvbuffer 中的數據刷到磁盤上的過程就叫 Spill,多麼明瞭的叫法,內存中的數據滿了就自動地 spill 到具備更大空間的磁盤。

  關於 Spill 觸發的條件,也就是 kvbuffer 用到什麼程度開始 Spill,仍是要講究一下的。若是把 kvbuffer 用得死死得,一點縫都不剩的時候再開始 Spill,那 Map 任務就須要等 Spill 完成騰出空間以後才能繼續寫數據;若是 kvbuffer 只是滿到必定程度,好比 80% 的時候就開始 Spill,那在 Spill 的同時,Map 任務還能繼續寫數據,若是 Spill 夠快,Map 可能都不須要爲空閒空間而發愁。兩利相衡取其大,通常選擇後者。

  Spill 這個重要的過程是由 Spill 線程承擔,Spill 線程從 Map 任務接到「命令」以後就開始正式幹活,乾的活叫 SortAndSpill,原來不只僅是 Spill,在 Spill 以前還有個頗具爭議性的 Sort。

Sort

  先把 kvbuffer 中的數據按照 partition 值和 key 兩個關鍵字升序排序,移動的只是索引數據,排序結果是 kvmeta 中數據按照 partition 爲單位彙集在一塊兒,同一 partition 內的按照 key 有序。

Spill

  Spill 線程爲此次 Spill 過程建立一個磁盤文件:從全部的本地目錄中輪詢查找能存儲這麼大空間的目錄,找到以後在其中建立一個相似於 「spill12.out」 的文件。Spill 線程根據排過序的 kvmeta 挨個 partition 的把 <key, value> 數據吐到這個文件中,一個 partition 對應的數據吐完以後順序地吐下個 partition,直到把全部的 partition 遍歷完。一個 partition 在文件中對應的數據也叫段 (segment)。

  全部的 partition 對應的數據都放在這個文件裏,雖然是順序存放的,可是怎麼直接知道某個 partition 在這個文件中存放的起始位置呢?強大的索引又出場了。有一個三元組記錄某個 partition 對應的數據在這個文件中的索引:起始位置、原始數據長度、壓縮以後的數據長度,一個 partition 對應一個三元組。而後把這些索引信息存放在內存中,若是內存中放不下了,後續的索引信息就須要寫到磁盤文件中了:從全部的本地目錄中輪訓查找能存儲這麼大空間的目錄,找到以後在其中建立一個相似於 「spill12.out.index」 的文件,文件中不光存儲了索引數據,還存儲了 crc32 的校驗數據。(spill12.out.index 不必定在磁盤上建立,若是內存(默認 1M 空間)中能放得下就放在內存中,即便在磁盤上建立了,和 spill12.out 文件也不必定在同一個目錄下。)

  每一次 Spill 過程就會最少生成一個 out 文件,有時還會生成 index 文件,Spill 的次數也烙印在文件名中。索引文件和數據文件的對應關係以下圖所示:
  

  在 Spill 線程如火如荼的進行 SortAndSpill 工做的同時,Map 任務不會所以而停歇,而是一無既往地進行着數據輸出。Map 仍是把數據寫到 kvbuffer 中,那問題就來了:<key, value> 只顧着悶頭按照 bufindex 指針向上增加,kvmeta 只顧着按照 kvindex 向下增加,是保持指針起始位置不變繼續跑呢,仍是另謀它路?若是保持指針起始位置不變,很快 bufindex 和 kvindex 就碰頭了,碰頭以後再從新開始或者移動內存都比較麻煩,不可取。Map 取 kvbuffer 中剩餘空間的中間位置,用這個位置設置爲新的分界點,bufindex 指針移動到這個分界點,kvindex 移動到這個分界點的 -16 位置,而後二者就能夠和諧地按照本身既定的軌跡放置數據了,當 Spill 完成,空間騰出以後,不須要作任何改動繼續前進。分界點的轉換以下圖所示:
  


  Map 任務總要把輸出的數據寫到磁盤上,即便輸出數據量很小在內存中所有能裝得下,在最後也會把數據刷到磁盤上。

 

12.1.2 Merge

  


  Map 任務若是輸出數據量很大,可能會進行好幾回 Spill,out 文件和 Index 文件會產生不少,分佈在不一樣的磁盤上。最後把這些文件進行合併的 merge 過程閃亮登場。
  Merge 過程怎麼知道產生的 Spill 文件都在哪了呢?從全部的本地目錄上掃描獲得產生的 Spill 文件,而後把路徑存儲在一個數組裏。Merge 過程又怎麼知道 Spill 的索引信息呢?沒錯,也是從全部的本地目錄上掃描獲得 Index 文件,而後把索引信息存儲在一個列表裏。到這裏,又遇到了一個值得納悶的地方。在以前 Spill 過程當中的時候爲何不直接把這些信息存儲在內存中呢,何須又多了這步掃描的操做?特別是 Spill 的索引數據,以前當內存超限以後就把數據寫到磁盤,如今又要從磁盤把這些數據讀出來,仍是須要裝到更多的內存中。之因此畫蛇添足,是由於這時 kvbuffer 這個內存大戶已經再也不使用能夠回收,有內存空間來裝這些數據了。(對於內存空間較大的土豪來講,用內存來省卻這兩個 io 步驟仍是值得考慮的。)

 

  而後爲 merge 過程建立一個叫 file.out 的文件和一個叫 file.out.Index 的文件用來存儲最終的輸出和索引。
  一個 partition 一個 partition 的進行合併輸出。對於某個 partition 來講,從索引列表中查詢這個 partition 對應的全部索引信息,每一個對應一個段插入到段列表中。也就是這個 partition 對應一個段列表,記錄全部的 Spill 文件中對應的這個 partition 那段數據的文件名、起始位置、長度等等。
  而後對這個 partition 對應的全部的 segment 進行合併,目標是合併成一個 segment。當這個 partition 對應不少個 segment 時,會分批地進行合併:先從 segment 列表中把第一批取出來,以 key 爲關鍵字放置成最小堆,而後從最小堆中每次取出最小的 <key, value> 輸出到一個臨時文件中,這樣就把這一批段合併成一個臨時的段,把它加回到 segment 列表中;再從 segment 列表中把第二批取出來合併輸出到一個臨時 segment,把其加入到列表中;這樣往復執行,直到剩下的段是一批,輸出到最終的文件中。
  最終的索引數據仍然輸出到 Index 文件中。
  Map 端的 Shuffle 過程到此結束。

12.1.3 Copy

  Reduce 任務經過 HTTP 向各個 Map 任務拖取它所須要的數據。每一個節點都會啓動一個常駐的 HTTP server,其中一項服務就是響應 Reduce 拖取 Map 數據。當有 MapOutput 的 HTTP 請求過來的時候,HTTP server 就讀取相應的 Map 輸出文件中對應這個 Reduce 部分的數據經過網絡流輸出給 Reduce。
  Reduce 任務拖取某個 Map 對應的數據,若是在內存中能放得下此次數據的話就直接把數據寫到內存中。Reduce 要向每一個 Map 去拖取數據,在內存中每一個 Map 對應一塊數據,當內存中存儲的 Map 數據佔用空間達到必定程度的時候,開始啓動內存中 merge,把內存中的數據 merge 輸出到磁盤上一個文件中。
  若是在內存中不能放得下這個 Map 的數據的話,直接把 Map 數據寫到磁盤上,在本地目錄建立一個文件,從 HTTP 流中讀取數據而後寫到磁盤,使用的緩存區大小是 64K。拖一個 Map 數據過來就會建立一個文件,當文件數量達到必定閾值時,開始啓動磁盤文件 merge,把這些文件合併輸出到一個文件。
  有些 Map 的數據較小是能夠放在內存中的,有些 Map 的數據較大須要放在磁盤上,這樣最後 Reduce 任務拖過來的數據有些放在內存中了有些放在磁盤上,最後會對這些來一個全局合併。

12.1.4 Merge Sort

  這裏使用的 Merge 和 Map 端使用的 Merge 過程同樣。Map 的輸出數據已是有序的,Merge 進行一次合併排序,所謂 Reduce 端的 sort 過程就是這個合併的過程。通常 Reduce 是一邊 copy 一邊 sort,即 copy 和 sort 兩個階段是重疊而不是徹底分開的。
  Reduce 端的 Shuffle 過程至此結束。

12.2 HashShuffle 過程介紹

  Spark 豐富了任務類型,有些任務之間數據流轉不須要經過 Shuffle,可是有些任務之間仍是須要經過 Shuffle 來傳遞數據,好比 wide dependency 的 group by key。
  Spark 中須要 Shuffle 輸出的 Map 任務會爲每一個 Reduce 建立對應的 bucket,Map 產生的結果會根據設置的 partitioner 獲得對應的 bucketId,而後填充到相應的 bucket 中去。每一個 Map 的輸出結果可能包含全部的 Reduce 所須要的數據,因此每一個 Map 會建立 R 個 bucket(R 是 reduce 的個數),M 個 Map 總共會建立 M*R 個 bucket。
  Map 建立的 bucket 其實對應磁盤上的一個文件,Map 的結果寫到每一個 bucket 中其實就是寫到那個磁盤文件中,這個文件也被稱爲 blockFile,是 Disk Block Manager 管理器經過文件名的 Hash 值對應到本地目錄的子目錄中建立的。每一個 Map 要在節點上建立 R 個磁盤文件用於結果輸出,Map 的結果是直接輸出到磁盤文件上的,100KB 的內存緩衝是用來建立 Fast Buffered OutputStream 輸出流。這種方式一個問題就是 Shuffle 文件過多。
  


  1)每個 Mapper 建立出和 Reducer 數目相同的 bucket,bucket 其實是一個 buffer,其大小爲 spark.shuffle.file.buffer.kb(默認 32KB)。
  2)Mapper 產生的結果會根據設置的 partition 算法填充到每一個 bucket 中去,而後再寫入到磁盤文件。
  3)Reducer 從遠端或是本地的 block manager 中找到相應的文件讀取數據。

 

  針對上述 Shuffle 過程產生的文件過多問題,Spark 有另一種改進的 Shuffle 過程:consolidation Shuffle,以期顯著減小 Shuffle 文件的數量。在 consolidation Shuffle 中每一個 bucket 並不是對應一個文件,而是對應文件中的一個 segment 部分。Job 的 map 在某個節點上第一次執行,爲每一個 reduce 建立 bucke 對應的輸出文件,把這些文件組織成 ShuffleFileGroup,當此次 map 執行完以後,這個 ShuffleFileGroup 能夠釋放爲下次循環利用;當又有 map 在這個節點上執行時,不須要建立新的 bucket 文件,而是在上次的 ShuffleFileGroup 中取得已經建立的文件繼續追加寫一個 segment;當前次 map 還沒執行完,ShuffleFileGroup 尚未釋放,這時若是有新的 map 在這個節點上執行,沒法循環利用這個 ShuffleFileGroup,而是隻能建立新的 bucket 文件組成新的 ShuffleFileGroup 來寫輸出。
  

  好比一個 Job 有 3 個 Map 和 2 個 reduce:
  (1) 若是此時集羣有 3 個節點有空槽,每一個節點空閒了一個 core,則 3 個 Map 會調度到這 3 個節點上執行,每一個 Map 都會建立 2 個 Shuffle 文件,總共建立 6 個 Shuffle 文件;
  (2) 若是此時集羣有 2 個節點有空槽,每一個節點空閒了一個 core,則 2 個 Map 先調度到這 2 個節點上執行,每一個 Map 都會建立 2 個 Shuffle 文件,而後其中一個節點執行完 Map 以後又調度執行另外一個 Map,則這個 Map 不會建立新的 Shuffle 文件,而是把結果輸出追加到以前 Map 建立的 Shuffle 文件中;總共建立 4 個 Shuffle 文件;
  (3) 若是此時集羣有 2 個節點有空槽,一個節點有 2 個空 core 一個節點有 1 個空 core,則一個節點調度 2 個 Map 一個節點調度 1 個 Map,調度 2 個 Map 的節點上,一個 Map 建立了 Shuffle 文件,後面的 Map 仍是會建立新的 Shuffle 文件,由於上一個 Map 還正在寫,它建立的 ShuffleFileGroup 尚未釋放;總共建立 6 個 Shuffle 文件。
優勢
  1)快-不須要排序,也不須要維持 hash 表
  2)不須要額外空間用做排序
  3)不須要額外IO-數據寫入磁盤只需一次,讀取也只需一次
缺點
  1)當 partitions 大時,輸出大量的文件(cores * R),性能開始下降
  2)大量的文件寫入,使文件系統開始變爲隨機寫,性能比順序寫要下降 100 倍
  3)緩存空間佔用比較大
  Reduce 去拖 Map 的輸出數據,Spark 提供了兩套不一樣的拉取數據框架:經過 socket 鏈接去取數據;使用n etty 框架去取數據。
  每一個節點的 Executor 會建立一個 BlockManager,其中會建立一個 BlockManagerWorker 用於響應請求。當 Reduce 的 GET_BLOCK 的請求過來時,讀取本地文件將這個 blockId 的數據返回給 Reduce。若是使用的是 Netty 框架,BlockManager 會建立 ShuffleSender 用於發送 Shuffle 數據。

  並非全部的數據都是經過網絡讀取,對於在本節點的 Map 數據,Reduce 直接去磁盤上讀取而再也不經過網絡框架。
  Reduce 拖過來數據以後以什麼方式存儲呢?Spark Map 輸出的數據沒有通過排序,Spark Shuffle 過來的數據也不會進行排序,Spark 認爲 Shuffle 過程當中的排序不是必須的,並非全部類型的 Reduce 須要的數據都須要排序,強制地進行排序只會增長 Shuffle 的負擔。Reduce 拖過來的數據會放在一個 HashMap 中,HashMap 中存儲的也是 <key, value> 對,key 是 Map 輸出的 key,Map 輸出對應這個 key 的全部 value 組成 HashMap 的 value。Spark 將 Shuffle 取過來的每個 <key, value> 對插入或者更新到 HashMap 中,來一個處理一個。HashMap 所有放在內存中。
  Shuffle 取過來的數據所有存放在內存中,對於數據量比較小或者已經在 Map 端作過合併處理的 Shuffle 數據,佔用內存空間不會太大,可是對於好比 group by key 這樣的操做,Reduce 須要獲得 key 對應的全部 value,並將這些 value 組一個數組放在內存中,這樣當數據量較大時,就須要較多內存。
  當內存不夠時,要不就失敗,要不就用老辦法把內存中的數據移到磁盤上放着。Spark 意識到在處理數據規模遠遠大於內存空間時所帶來的不足,引入了一個具備外部排序的方案。Shuffle 過來的數據先放在內存中,當內存中存儲的 <key, value> 對超過 1000 而且內存使用超過 70% 時,判斷節點上可用內存若是還足夠,則把內存緩衝區大小翻倍,若是可用內存再也不夠了,則把內存中的 <key, value> 對排序而後寫到磁盤文件中。最後把內存緩衝區中的數據排序以後和那些磁盤文件組成一個最小堆,每次從最小堆中讀取最小的數據,這個和 MapReduce 中的 merge 過程相似。

12.3 SortShuffle 過程介紹

  從 1.2.0 開始默認爲 sort shuffle(spark.shuffle.manager = sort),實現邏輯相似於 Hadoop MapReduce,Hash Shuffle 每個 reducers 產生一個文件,可是 Sort Shuffle 只是產生一個按照 reducer id 排序可索引的文件,這樣,只需獲取有關文件中的相關數據塊的位置信息,並 fseek 就能夠讀取指定 reducer 的數據。但對於 rueducer 數比較少的狀況,Hash Shuffle 明顯要比 Sort Shuffle 快,所以 Sort Shuffle 有個 「fallback」 計劃,對於 reducers 數少於 「spark.shuffle.sort.bypassMergeThreshold」 (200 by default),咱們使用 fallback 計劃,hashing 相關數據到分開的文件,而後合併這些文件爲一個,具體實現爲 BypassMergeSortShuffleWriter。
  

  在 map 進行排序,在 reduce 端應用 Timsort[1] 進行合併。map 端是否允許 spill,經過 spark.shuffle.spill 來設置,默認是 true。設置爲 false,若是沒有足夠的內存來存儲 map 的輸出,那麼就會致使 OOM 錯誤,所以要慎用。
  用於存儲 map 輸出的內存爲:「JVM Heap Size」 * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction,默認爲: 「JVM Heap Size」 * 0.2 * 0.8 = 「JVM Heap Size」 * 0.16。若是你在同一個執行程序中運行多個線程(設定 spark.executor.cores/ spark.task.cpus 超過 1),每一個 map 任務存儲的空間爲 「JVM Heap Size」 * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus,默認 2 個 cores,那麼爲 0.08 * 「JVM Heap Size」。
  

  spark 使用 AppendOnlyMap 存儲 map 輸出的數據,利用開源 hash 函數 MurmurHash3 和平方探測法把 key 和 value 保存在相同的 array 中。這種保存方法能夠是 spark 進行 combine。若是 spill 爲 true,會在 spill 前 sort。
  與 hash shuffle 相比,sort shuffle 中每一個 Mapper 只產生一個數據文件和一個索引文件,數據文件中的數據按照 Reducer 排序,但屬於同一個 Reducer 的數據不排序。Mapper 產生的數據先放到 AppendOnlyMap 這個數據結構中,若是內存不夠,數據則會 spill 到磁盤,最後合併成一個文件。
  與 Hash shuffle 相比,shuffle 文件數量減小,內存使用更加可控。但排序會影響速度。
優勢
  1)map 建立文件量較少。
  2)少許的 IO 隨機操做,大部分是順序讀寫。
缺點
  1)要比 Hash Shuffle 要慢,須要本身經過 spark.shuffle.sort.bypassMergeThreshold 來設置合適的值。
  2)若是使用 SSD 盤存儲 shuffle 數據,那麼 Hash Shuffle 可能更合適。

12.4 TungstenShuffle 過程介紹

  Tungsten-sort 算不得一個全新的 shuffle 方案,它在特定場景下基於相似現有的 Sort Based Shuffle 處理流程,對內存 /CPU/Cache 使用作了很是大的優化。帶來高效的同時,也就限定了本身的使用場景。若是 Tungsten-sort 發現本身沒法處理,則會自動使用 Sort Based Shuffle 進行處理。Tungsten 中文是鎢絲的意思。 Tungsten Project 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計劃,該計劃初期彷佛對 Spark SQL 優化的最多。不過部分 RDD API 還有 Shuffle 也所以受益。
Tungsten-sort 優化點主要在三個方面:
  1)直接在 serialized binary data 上 sort 而不是 java objects,減小了 memory 的開銷和 GC 的 overhead。
  2)提供 cache-efficient sorter,使用一個 8bytes 的指針,把排序轉化成了一個指針數組的排序。
  3)spill 的 merge 過程也無需反序列化便可完成。

  這些優化的實現致使引入了一個新的內存管理模型,相似 OS 的 Page,對應的實際數據結構爲 MemoryBlock,支持 off-heap 以及 in-heap 兩種模式。爲了可以對 Record 在這些 MemoryBlock 進行定位,引入了 Pointer(指針)的概念。
若是你還記得 Sort Based Shuffle 裏存儲數據的對象 PartitionedAppendOnlyMap,這是一個放在 JVM heap 裏普通對象,在 Tungsten-sort 中,他被替換成了相似操做系統內存頁的對象。若是你沒法申請到新的 Page,這個時候就要執行 spill 操做,也就是寫入到磁盤的操做。具體觸發條件,和 Sort Based Shuffle 也是相似的。
  Spark 默認開啓的是 Sort Based Shuffle,想要打開 Tungsten-sort,請設置
  spark.shuffle.manager=tungsten-sort
  對應的實現類是:org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
  名字的來源是由於使用了大量 JDK Sun Unsafe API。

當且僅當下面條件都知足時,纔會使用新的 Shuffle 方式:
  1)Shuffle dependency 不能帶有 aggregation 或者輸出須要排序
  2)Shuffle 的序列化器須要是 KryoSerializer 或者 Spark SQL's 自定義的一些序列化方式.
  3)Shuffle 文件的數量不能大於 16777216。
  4)序列化時,單條記錄不能大於 128 MB。
能夠看到,能使用的條件仍是挺苛刻的。
這些限制來源於哪裏
參看以下代碼,page 的大小:
  this.pageSizeBytes = (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,shuffleMemoryManager.pageSizeBytes());
這就保證了頁大小不超過 PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES 的值,該值就被定義成了 128M。
而產生這個限制的具體設計緣由,咱們還要仔細分析下 Tungsten 的內存模型,以下圖所示:
  

這張圖其實畫的是 on-heap 的內存邏輯圖,其中 #Page 部分爲 13bit,Offset 爲 51bit,你會發現 2^51 >> 128M 的。可是在 Shuffle 的過程當中,對 51bit 作了壓縮,使用了 27bit,具體以下:
  [24 bit partition number][13 bit memory page number][27 bit offset in page]
這裏預留出的 24bi t給了 partition number,爲了後面的排序用。上面的好幾個限制其實都是由於這個指針引發的:
  第一個是 partition 的限制,前面的數字 16777216 就是來源於 partition number 使用 24bit 表示的。
  第二個是 page number。
  第三個是偏移量,最大能表示到 2^27=128M。那一個 Task 能管理到的內存是受限於這個指針的,最可能是 2^13 * 128M 也就是 1TB 左右。

有了這個指針,咱們就能夠定位和管理到 off-heap 或者 on-heap 裏的內存了。這個模型仍是很漂亮的,內存管理也很是高效,記得以前的預估 PartitionedAppendOnlyMap 的內存是很是困難的,可是經過如今的內存管理機制,是很是快速而且精確的。
對於第一個限制,那是由於後續 Shuffle Write 的 sort 部分,只對前面 24bit 的 partiton number 進行排序,key 的值沒有被編碼到這個指針,因此沒辦法進行 ordering。
同時,由於整個過程是追求不反序列化的,因此不能作 aggregation。

Shuffle Write
核心類:
  org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter
數據會經過 UnsafeShuffleExternalSorter.insertRecordIntoSorter 一條一條寫入到 serOutputStream 序列化輸出流。
這裏消耗內存的地方是
  serBuffer = new MyByteArrayOutputStream(1024 * 1024)
默認是 1M,相似於 Sort Based Shuffle 中的 ExternalSorter,在 Tungsten Sort 對應的爲 UnsafeShuffleExternalSorter,記錄序列化後就經過 sorter.insertRecord 方法放到 sorter 裏去了。
這裏 sorter 負責申請 Page,釋放 Page,判斷是否要進行 spill 都這個類裏完成。代碼的架子其實和 Sort Based 是同樣的。
  

(另外,值得注意的是,這張圖裏進行 spill 操做的同時檢查內存可用而致使的 Exeception 的 bug 已經在 1.5.1 版本被修復了,忽略那條路徑)
內存是否充足的條件依然 shuffleMemoryManager 來決定,也就是全部 Task Shuffle 申請的 Page 內存總和不能大於下面的值:
  ExecutorHeapMemeory * 0.2 * 0.8
上面的數字可經過下面兩個配置來更改:
  spark.shuffle.memoryFraction=0.2
  spark.shuffle.safetyFraction=0.8
UnsafeShuffleExternalSorter 負責申請內存,而且會生成該條記錄最後的邏輯地址,也就前面提到的 Pointer。
接着 Record 會繼續流轉到 UnsafeShuffleInMemorySorter 中,這個對象維護了一個指針數組:
  private long[] pointerArray;
數組的初始大小爲 4096,後續若是不夠了,則按每次兩倍大小進行擴充。
假設 100 萬條記錄,那麼該數組大約是 8M 左右,因此其實仍是很小的。一旦 spill 後該 UnsafeShuffleInMemorySorter 就會被賦爲 null,被回收掉。
咱們回過頭來看 spill,其實邏輯上也異常簡單了,UnsafeShuffleInMemorySorter 會返回一個迭代器,該迭代器粒度每一個元素就是一個指針,而後到根據該指針能夠拿到真實的 record,而後寫入到磁盤,由於這些 record 在一開始進入 UnsafeShuffleExternalSorter 就已經被序列化了,因此在這裏就純粹變成寫字節數組了。造成的結構依然和 Sort Based Shuffle 一致,一個文件裏不一樣的 partiton 的數據用 fileSegment 來表示,對應的信息存在一個 index 文件裏。
另外寫文件的時候也須要一個 buffer:
  spark.shuffle.file.buffer=32k
另外從內存裏拿到數據放到 DiskWriter,這中間還要有個中轉,是經過:
  final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE=1024 * 1024];
來完成的,都是內存,因此很快。
Task 結束前,咱們要作一次 mergeSpills 操做,而後造成一個 shuffle 文件。這裏面其實也挺複雜的,
若是開啓了
  spark.shuffle.unsafe.fastMergeEnabled=true
而且沒有開啓
  spark.shuffle.compress=true
或者壓縮方式爲:
  LZFCompressionCodec
則能夠很是高效的進行合併,叫作 transferTo。不過不管是什麼合併,都不須要進行反序列化。

Shuffle Read
Shuffle Read 徹底複用 HashShuffleReader,具體參看 Sort-Based Shuffle。

12.5 MapReduce 與 Spark 過程對比

MapReduce 和 Spark 的 Shuffle 過程對好比下:

第13章 Spark 內存管理

  Spark 做爲一個基於內存的分佈式計算引擎,其內存管理模塊在整個系統中扮演着很是重要的角色。理解 Spark 內存管理的基本原理,有助於更好地開發 Spark 應用程序和進行性能調優。本文中闡述的原理基於 Spark 2.1 版本。
  在執行 Spark 的應用程序時,Spark 集羣會啓動 Driver 和 Executor 兩種 JVM 進程,前者爲主控進程,負責建立 Spark 上下文,提交 Spark 做業(Job),並將做業轉化爲計算任務(Task),在各個 Executor 進程間協調任務的調度,後者負責在工做節點上執行具體的計算任務,並將結果返回給 Driver,同時爲須要持久化的 RDD 提供存儲功能。因爲 Driver 的內存管理相對來講較爲簡單,本文主要對 Executor 的內存管理進行分析,下文中的 Spark 內存均特指 Executor 的內存

13.1 堆內和堆外內存規劃

  做爲一個 JVM 進程,Executor 的內存管理創建在 JVM 的內存管理之上,Spark 對 JVM 的堆內(On-heap)空間進行了更爲詳細的分配,以充分利用內存。同時,Spark 引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,進一步優化了內存的使用。堆內和堆外內存示意圖以下:

13.1.1 堆內內存

  堆內內存的大小,由 Spark 應用程序啓動時的 -executor-memoryspark.executor.memory 參數配置。Executor 內運行的併發任務共享 JVM 堆內內存,這些任務在緩存 RDD 數據和廣播(Broadcast)數據時佔用的內存被規劃爲存儲(Storage)內存,而這些任務在執行 Shuffle 時佔用的內存被規劃爲執行(Execution)內存,剩餘的部分不作特殊規劃,那些 Spark 內部的對象實例,或者用戶定義的 Spark 應用程序中的對象實例,均佔用剩餘的空間。不一樣的管理模式下,這三部分佔用的空間大小各不相同(下面第 2 小節會進行介紹)。
  Spark 對堆內內存的管理是一種邏輯上的規劃式的管理,由於對象實例佔用內存的申請和釋放都由 JVM 完成,Spark 只能在申請後和釋放前記錄這些內存,咱們來看其具體流程:
申請內存
  1)Spark 在代碼中 new 一個對象實例
  2)JVM 從堆內內存分配空間,建立對象並返回對象引用
  3)Spark 保存該對象的引用,記錄該對象佔用的內存
釋放內存
  1)Spark 記錄該對象釋放的內存,刪除該對象的引用
  2)等待 JVM 的垃圾回收機制釋放該對象佔用的堆內內存

  咱們知道,JVM 的對象能夠以序列化的方式存儲,序列化的過程是將對象轉換爲二進制字節流,本質上能夠理解爲將非連續空間的鏈式存儲轉化爲連續空間或塊存儲,在訪問時則須要進行序列化的逆過程--反序列化,將字節流轉化爲對象,序列化的方式能夠節省存儲空間,但增長了存儲和讀取時候的計算開銷。
  對於 Spark 中序列化的對象,因爲是字節流的形式,其佔用的內存大小可直接計算,而對於非序列化的對象,其佔用的內存是經過週期性地採樣近似估算而得,即並非每次新增的數據項都會計算一次佔用的內存大小,這種方法下降了時間開銷可是有可能偏差較大,致使某一時刻的實際內存有可能遠遠超出預期。此外,在被 Spark 標記爲釋放的對象實例,頗有可能在實際上並無被 JVM 回收,致使實際可用的內存小於 Spark 記錄的可用內存。因此 Spark 並不能準確記錄實際可用的堆內內存,從而也就沒法徹底避免內存溢出(OOM, Out of Memory)的異常。
  雖然不能精準控制堆內內存的申請和釋放,但 Spark 經過對存儲內存和執行內存各自獨立的規劃管理,能夠決定是否要在存儲內存裏緩存新的 RDD,以及是否爲新的任務分配執行內存,在必定程度上能夠提高內存的利用率,減小異常的出現。

13.1.2 堆外內存

  爲了進一步優化內存的使用以及提升 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,存儲通過序列化的二進制數據。利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時再也不基於 Tachyon,而是與堆外的執行內存同樣,基於 JDK Unsafe API 實現),Spark 能夠直接操做系統堆外內存,減小了沒必要要的內存開銷,以及頻繁的 GC 掃描和回收,提高了處理性能。堆外內存能夠被精確地申請和釋放,並且序列化的數據佔用的空間能夠被精確計算,因此相比堆內內存來講下降了管理的難度,也下降了偏差。
  在默認狀況下堆外內存並不啓用,可經過配置 spark.memory.offHeap.enabled 參數啓用,並由 spark.memory.offHeap.size 參數設定堆外空間的大小。除了沒有 other 空間,堆外內存與堆內內存的劃分方式相同,全部運行中的併發任務共享存儲內存和執行內存。

13.1.3 內存管理接口

  Spark 爲存儲內存和執行內存的管理提供了統一的接口--MemoryManager,同一個 Executor 內的任務都調用這個接口的方法來申請或釋放內存:

內存管理接口的主要方法:

// 申請存儲內存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
// 申請展開內存
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
// 申請執行內存
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
// 釋放存儲內存
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
// 釋放執行內存
def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit
// 釋放展開內存
def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit

Spark的內存管理 – 內存管理接口

  咱們看到,在調用這些方法時都須要指定其內存模式(MemoryMode),這個參數決定了是在堆內仍是堆外完成此次操做。MemoryManager 的具體實現上,Spark 1.6 以後默認爲統一管理(Unified Memory Manager)方式,1.6 以前採用的靜態管理(Static Memory Manager)方式仍被保留,可經過配置 spark.memory.useLegacyMode 參數啓用。兩種方式的區別在於對空間分配的方式,下面的第 2 小節會分別對這兩種方式進行介紹。

13.2 內存空間分配

13.2.1 靜態內存管理

  在 Spark 最初採用的靜態內存管理機制下,存儲內存、執行內存和其餘內存的大小在 Spark 應用程序運行期間均爲固定的,但用戶能夠應用程序啓動前進行配置。

靜態內存管理圖示--堆內


能夠看到,可用的堆內內存的大小須要按照下面的方式計算:

可用堆內內存空間:

可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

  其中 systemMaxMemory 取決於當前 JVM 堆內內存的大小,最後可用的執行內存或者存儲內存要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。上述計算公式中的兩個 safetyFraction 參數,其意義在於在邏輯上預留出 1-safetyFraction 這麼一塊保險區域,下降因實際內存超出當前預設範圍而致使 OOM 的風險(上文提到,對於非序列化對象的內存採樣估算會產生偏差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時 Spark 並無區別對待,和 「其它內存」 同樣交給了 JVM 去管理。

  堆外的空間分配較爲簡單,只有存儲內存和執行內存,以下圖所示。可用的執行內存和存儲內存佔用的空間大小直接由參數 spark.memory.storageFraction 決定,因爲堆外內存佔用的空間能夠被精確計算,因此無需再設定保險區域。

靜態內存管理圖示--堆外

  靜態內存管理機制實現起來較爲簡單,但若是用戶不熟悉 Spark 的存儲機制,或沒有根據具體的數據規模和計算任務或作相應的配置,很容易形成 「一半海水,一半火焰」 的局面,即存儲內存和執行內存中的一方剩餘大量的空間,而另外一方卻早早被佔滿,不得不淘汰或移出舊的內容以存儲新的內容。因爲新的內存管理機制的出現,這種方式目前已經不多有開發者使用,出於兼容舊版本的應用程序的目的,Spark 仍然保留了它的實現。

13.2.2 統一內存管理

  Spark 1.6 以後引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,能夠動態佔用對方的空閒區域。

統一內存管理圖示--堆內

統一內存管理圖示--堆外

其中最重要的優化在於動態佔用機制,其規則以下:
  1)設定基本的存儲內存和執行內存區域(spark.storage.storageFraction 參數),該設定肯定了雙方各自擁有的空間的範圍。
  2)雙方的空間都不足時,則存儲到硬盤;若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)。
  3)執行內存的空間被對方佔用後,可以讓對方將佔用的部分轉存到硬盤,而後 「歸還」 借用的空間。
  4)存儲內存的空間被對方佔用後,沒法讓對方 「歸還」,由於須要考慮 Shuffle 過程當中的不少因素,實現起來較爲複雜。

動態佔用機制圖示

  憑藉統一內存管理機制,Spark 在必定程度上提升了堆內和堆外內存資源的利用率,下降了開發者維護 Spark 內存的難度,但並不意味着開發者能夠高枕無憂。譬如,因此若是存儲內存的空間太大或者說緩存的數據過多,反而會致使頻繁的全量垃圾回收,下降任務執行時的性能,由於緩存的 RDD 數據一般都是長期駐留內存的。因此要想充分發揮 Spark 的性能,須要開發者進一步瞭解存儲內存和執行內存各自的管理方式和實現原理。

13.3 存儲內存管理

13.3.1 RDD 的持久化機制

  彈性分佈式數據集(RDD)做爲 Spark 最根本的數據抽象,是隻讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上建立,或者在其餘已有的 RDD 上執行轉換(Transformation)操做產生一個新的 RDD。轉換後的 RDD 與原始的 RDD 之間產生的依賴關係,構成了血統(Lineage)。憑藉血統,Spark 保證了每個 RDD 均可以被從新恢復。但 RDD 的全部轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 纔會建立任務讀取 RDD,而後真正觸發轉換的執行。
  Task 在啓動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,若是沒有則須要檢查 Checkpoint 或按照血統從新計算。因此若是一個 RDD 上要執行屢次行動,能夠在第一次行動中使用 persist 或 cache 方法,在內存或磁盤中持久化或緩存這個 RDD,從而在後面的行動時提高計算速度。事實上,cache 方法是使用默認的 MEMORY_ONLY 的存儲級別將 RDD 持久化到內存,故緩存是一種特殊的持久化。堆內和堆外存儲內存的設計,即可以對緩存 RDD 時使用的內存作統一的規劃和管理(存儲內存的其餘應用場景,如緩存 broadcast 數據,暫時不在本文的討論範圍以內)。
  RDD 的持久化由 Spark 的 Storage 模塊負責,實現了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程當中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構,即 Driver 端的 BlockManager 爲 Master,Executor 端的 BlockManager 爲 Slave。Storage 模塊在邏輯上以 Block 爲基本存儲單位,RDD 的每一個 Partition 通過處理後惟一對應一個 Block(BlockId 的格式爲 rdd_RDD-ID_PARTITION-ID )。Master 負責整個 Spark 應用程序的 Block 的元數據信息的管理和維護,而 Slave 須要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。

Storage 模塊示意圖

在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不一樣的 存儲級別,而存儲級別是如下 5 個變量的組合:
存儲級別

class StorageLevel private(
  private var _useDiskBoolean,        // 磁盤
  private var _useMemoryBoolean,      // 這裏實際上是指堆內內存
  private var _useOffHeapBoolean,     // 堆外內存
  private var _deserializedBoolean,   // 是否爲非序列化
  private var _replicationInt 
1     // 副本個數
)

經過對數據結構的分析,能夠看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:

    1)存儲位置:磁盤/堆內內存/堆外內存。如 MEMORY_AND_DISK 是同時在磁盤和堆內內存上存儲,實現了冗餘備份。OFF_HEAP 則是隻在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其餘位置。
    2)存儲形式:Block 緩存到存儲內存後,是否爲非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。
    3)副本數量:大於 1 時須要遠程冗餘備份到其餘節點。如 DISK_ONLY_2 須要遠程備份 1 個副本。

13.3.2 RDD 緩存的過程

  RDD 在緩存到存儲內存以前,Partition 中的數據通常以迭代器(Iterator)的數據結構來訪問,這是 Scala 語言中一種遍歷數據集合的方法。經過 Iterator 能夠獲取分區中每一條序列化或者非序列化的數據項 (Record),這些 Record 的對象實例在邏輯上佔用了 JVM 堆內內存的 other 部分的空間,同一 Partition 的不一樣 Record 的空間並不連續。
  RDD 在緩存到存儲內存以後,Partition 被轉換成 Block,Record 在堆內或堆外存儲內存中佔用一塊連續的空間。將 Partition 由不連續的存儲空間轉換爲連續存儲空間的過程,Spark 稱之爲 「展開」(Unroll)。Block 有序列化和非序列化兩種存儲格式,具體以哪一種方式取決於該 RDD 的存儲級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義,用一個數組存儲全部的對象實例,序列化的 Block 則以 SerializedMemoryEntry 的數據結構定義,用字節緩衝區(ByteBuffer)來存儲二進制數據。每一個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中全部的 Block 對象的實例,對這個 LinkedHashMap 新增和刪除間接記錄了內存的申請和釋放。
  由於不能保證存儲空間能夠一次容納 Iterator 中的全部數據,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時佔位,空間不足則 Unroll 失敗,空間足夠時能夠繼續進行。對於序列化的 Partition,其所需的 Unroll 空間能夠直接累加計算,一次申請。而非序列化的 Partition 則要在遍歷 Record 的過程當中依次申請,即每讀取一條 Record,採樣估算其所需的 Unroll 空間並進行申請,空間不足時能夠中斷,釋放已佔用的 Unroll 空間。若是最終 Unroll 成功,當前 Partition 所佔用的 Unroll 空間被轉換爲正常的緩存 RDD 的存儲空間,以下圖所示。

Spark Unroll 示意圖

  在靜態內存管理時,Spark 在存儲內存中專門劃分了一塊 Unroll 空間,其大小是固定的,統一內存管理時則沒有對 Unroll 空間進行特別區分,當存儲空間不足時會根據動態佔用機制進行處理。

13.3.3 淘汰和落盤

  因爲同一個 Executor 的全部的計算任務共享有限的存儲內存空間,當有新的 Block 須要緩存可是剩餘空間不足且沒法動態佔用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 若是其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),不然直接刪除該 Block。
  存儲內存的淘汰規則爲:
  1)被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬於堆外或堆內內存。
  2)新舊 Block 不能屬於同一個 RDD,避免循環淘汰。
  3)舊 Block 所屬 RDD 不能處於被讀狀態,避免引起一致性問題。
  4)遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到知足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。
  落盤的流程則比較簡單,若是其存儲級別符合_useDisk 爲 true 的條件,再根據其 _deserialized 判斷是不是非序列化的形式,如果則對其進行序列化,最後將數據存儲到磁盤,在 Storage 模塊中更新其信息。

13.4 執行內存管理

13.4.1 多任務間內存分配

  Executor 內運行的任務一樣共享執行內存,Spark 用一個 HashMap 結構保存了任務到內存耗費的映射。每一個任務可佔用的執行內存大小的範圍爲 1/2N ~ 1/N,其中 N 爲當前 Executor 內正在運行的任務的個數。每一個任務在啓動之時,要向 MemoryManager 請求申請最少爲 1/2N 的執行內存,若是不能被知足要求則該任務被阻塞,直到有其餘任務釋放了足夠的執行內存,該任務才能夠被喚醒。

13.4.2 Shuffle 的內存佔用

  執行內存主要用來存儲任務在執行 Shuffle 時佔用的內存,Shuffle 是按照必定規則對 RDD 數據從新分區的過程,咱們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:
Shuffle Write
  1)若在 map 端選擇普通的排序方式,會採用 ExternalSorter 進行外排,在內存中存儲數據時主要佔用堆內執行空間。
  2)若在 map 端選擇 Tungsten 的排序方式,則採用 ShuffleExternalSorter 直接對以序列化形式存儲的數據排序,在內存中存儲數據時能夠佔用堆外或堆內執行空間,取決於用戶是否開啓了堆外內存以及堆外執行內存是否足夠。
Shuffle Read
  1)在對 reduce 端的數據進行聚合時,要將數據交給 Aggregator 處理,在內存中存儲數據時佔用堆內執行空間。
  2)若是須要進行最終結果排序,則要將再次將數據交給 ExternalSorter 處理,佔用堆內執行空間。

  在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程當中全部數據並不能都保存到該哈希表中,當這個哈希表佔用的內存會進行週期性地採樣估算,當其大到必定程度,沒法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其所有內容存儲到磁盤文件中,這個過程被稱爲溢存(Spill),溢存到磁盤的文件最後會被歸併(Merge)。
  Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計劃,解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的狀況來自動選擇是否採用 Tungsten 排序。Tungsten 採用的頁式內存管理機制創建在 MemoryManager 之上,即 Tungsten 對執行內存的使用進行了一步的抽象,這樣在 Shuffle 過程當中無需關心數據具體存儲在堆內仍是堆外。每一個內存頁用一個 MemoryBlock 來定義,並用 Object obj 和 long offset 這兩個變量統一標識一個內存頁在系統內存中的地址。堆內的 MemoryBlock 是以 long 型數組的形式分配的內存,其 obj 的值爲是這個數組的對象引用,offset 是 long 型數組的在 JVM 中的初始偏移地址,二者配合使用能夠定位這個數組在堆內的絕對地址;堆外的 MemoryBlock 是直接申請到的內存塊,其 obj 爲 null,offset 是這個內存塊在系統內存中的 64 位絕對地址。Spark 用 MemoryBlock 巧妙地將堆內和堆外內存頁統一抽象封裝,並用頁表(pageTable)管理每一個 Task 申請到的內存頁。

  Tungsten 頁式管理下的全部內存用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:
  頁號:佔 13 位,惟一標識一個內存頁,Spark 在申請內存頁以前要先申請空閒頁號。
  頁內偏移量:佔 51 位,是在使用內存頁存儲數據時,數據在頁內的偏移地址。
  有了統一的尋址方式,Spark 能夠用 64 位邏輯地址的指針定位到堆內或堆外的內存,整個 Shuffle Write 排序的過程只須要對指針進行排序,而且無需反序列化,整個過程很是高效,對於內存訪問效率和 CPU 使用效率帶來了明顯的提高。

  Spark 的存儲內存和執行內存有着大相徑庭的管理方式:對於存儲內存來講,Spark 用一個 LinkedHashMap 來集中管理全部的 Block,Block 由須要緩存的 RDD 的 Partition 轉化而成;而對於執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程當中的數據,在 Tungsten 排序中甚至抽象成爲頁式內存管理,開闢了全新的 JVM 內存管理機制。

第14章 部署模式解析

14.1 部署模式概述

  Spark 支持的主要的三種分佈式部署方式分別是 standalone、spark on mesos 和 spark on YARN。standalone 模式,即獨立模式,自帶完整的服務,可單獨部署到一個集羣中,無需依賴任何其餘資源管理系統。它是 Spark 實現的資源調度框架,其主要的節點有 Client 節點、Master 節點和 Worker 節點。而 yarn 是統一的資源管理機制,在上面能夠運行多套計算框架,如 map reduce、storm 等根據 driver 在集羣中的位置不一樣,分爲 yarn client 和 yarn cluster。而 mesos 是一個更強大的分佈式資源管理框架,它容許多種不一樣的框架部署在其上,包括 yarn。基本上,Spark 的運行模式取決於傳遞給 SparkContext 的 MASTER 環境變量的值,個別模式還須要輔助的程序接口來配合使用,目前支持的 Master 字符串及 URL 包括:
  


  用戶在提交任務給 Spark 處理時,如下兩個參數共同決定了 Spark 的運行方式:
  • --master MASTER_URL :決定了 Spark 任務提交給哪一種集羣處理。
  • --deploy-mode DEPLOY_MODE :決定了 Driver 的運行方式,可選值爲 Client 或者 Cluster。

 

14.2 standalone 框架

  standalone 集羣由三個不一樣級別的節點組成,分別是:
  1)Master 主控節點,能夠類比爲董事長或總舵主,在整個集羣之中,最多隻有一個 Master 處在 Active 狀態。
  2)Worker 工做節點,這個是 manager,是分舵主, 在整個集羣中,能夠有多個 Worker,若是 Worker 爲零,什麼事也作不了。
  3)Executor 幹苦力活的,直接受 Worker 掌控,一個 Worker 能夠啓動多個 executor,啓動的個數受限於機器中的 cpu 核數。
  這三種不一樣類型的節點各自運行於本身的JVM進程之中。

  Standalone 模式下,集羣啓動時包括 Master 與 Worker,其中 Master 負責接收客戶端提交的做業,管理 Worker。根據做業提交的方式不一樣,分爲 driver on client 和 drvier on worker。以下圖所示,上圖爲 driver on client 模式,下圖爲 driver on work 模式。兩種模式的主要不一樣點在於 driver 所在的位置。
  在 standalone 部署模式下又分爲 client 模式和 cluster 模式。
  在client 模式下,driver 和 client 運行於同一 JVM 中,不禁 worker 啓動,該 JVM 進程直到 spark application 計算完成返回結果後才退出。以下圖所示:
  


  而在 cluster 模式下,driver 由 worker 啓動,client 在確認 spark application 成功提交給 cluster 後直接退出,並不等待 spark application 運行結果返回。以下圖所示:
  

從部署圖來進行分析,每一個 JVM 進程在啓動時的文件依賴如何獲得知足。
  1)Master 進程最爲簡單,除了 spark jar 包以外,不存在第三方庫依賴。
  2)Driver 和 Executor 在運行的時候都有可能存在第三方包依賴,分開來說。
  3)Driver 比較簡單,spark-submit 在提交的時候會指定所要依賴的 jar 文件從哪裏讀取。
  4)Executor 由 Worker 來啓動,Worker 須要下載 Executor 啓動時所須要的 jar 文件,那麼從哪裏下載呢?


  Spark Standalone 模式,即獨立模式,自帶完整的服務,可單獨部署到一個集羣中,無需依賴其餘資源管理系統。在該模式下,用戶能夠經過手動啓動 Master 和 Worker 來啓動一個獨立的集羣。其中,Master 充當了資源管理的角色,Workder 充當了計算節點的角色。在該模式下,Spark Driver 程序在客戶端(Client)運行,而 Executor 則在 Worker 節點上運行。如下是一個運行在 Standalone 模式下,包含一個 Master 節點,兩個 Worker 節點的 Spark 任務調度交互部署架構圖。

從上面的 Spark 任務調度過程能夠看到:
  1)整個集羣分爲 Master 節點和 Worker 節點,其 Driver 程序運行在客戶端。Master 節點負責爲任務分配 Worker 節點上的計算資源,二者會經過相互通訊來同步資源狀態,見途中紅色雙向箭頭。
  2)客戶端啓動任務後會運行 Driver 程序,Driver 程序中會完成 SparkContext 對象的初始化,並向 Master 進行註冊。
  3)每一個 Workder 節點上會存在一個或者多個 ExecutorBackend 進程。每一個進程包含一個 Executor 對象,該對象持有一個線程池,每一個線程池能夠執行一個任務(Task)。ExecutorBackend 進程還負責跟客戶端節點上的 Driver 程序進行通訊,上報任務狀態。

 

14.2.1 Standalone 模式下任務運行過程

  上面的過程反映了 Spark 在 standalone 模式下,總體上客戶端、Master 和 Workder 節點之間的交互。對於一個任務的具體運行過程須要更細緻的分解,分解運行過程見圖中的小字。
1) 用戶經過 bin/spark-submit 部署工具或者 bin/spark-class 啓動應用程序的 Driver 進程,Driver 進程會初始化 SparkContext 對象,並向 Master 節點進行註冊。
  • 一、Master 節點接受 Driver 程序的註冊,檢查它所管理的 Worker 節點,爲該 Driver 程序分配須要的計算資源 Executor。Worker 節點完成 Executor 的分配後,向 Master 報告 Executor 的狀態。
  • 二、Worker 節點上的 ExecutorBackend 進程啓動後,向 Driver 進程註冊。
2) Driver 進程內部經過 DAG Schaduler、Stage Schaduler、Task Schaduler 等過程完成任務的劃分後,向 Worker 節點上的 ExecutorBackend 分配 TASK。
  • 一、ExecutorBackend 進行 TASK 計算,並向 Driver 報告 TASK 狀態,直至結束。
  • 二、Driver 進程在全部 TASK 都處理完成後,向 Master 註銷。

14.2.2 總結

  Spark 可以以 standalone 模式運行,這是 Spark 自身提供的運行模式,用戶能夠經過手動啓動 master 和 worker 進程來啓動一個獨立的集羣,也能夠在一臺機器上運行這些守護進程進行測試。standalone 模式能夠用在生產環境,它有效的下降了用戶學習、測試 Spark 框架的成本。
  standalone 模式目前只支持跨應用程序的簡單 FIFO 調度。然而,爲了容許多個併發用戶,你能夠控制每一個應用使用的資源的最大數。默認狀況下,它會請求使用集羣的所有 CUP 內核。
  缺省狀況下,standalone 任務調度容許 worker 的失敗(在這種狀況下它能夠將失敗的任務轉移給其餘的 worker)。可是,調度器使用 master 來作調度,這會產生一個單點問題:若是 master 崩潰,新的應用不會被建立。爲了解決這個問題,能夠經過 zookeeper 的選舉機制在集羣中啓動多個 master,也能夠使用本地文件實現單節點恢復。

14.3 yarn 集羣模式

  Apache yarn 是 apache Hadoop 開源項目的一部分。設計之初是爲了解決 mapreduce 計算框架資源管理的問題。到 haodoop 2.0 使用 yarn 將 mapreduce 的分佈式計算和資源管理區分開來。它的引入使得 Hadoop 分佈式計算系統進入了平臺化時代,即各類計算框架能夠運行在一個集羣中,由資源管理系統 YRAN 進行統一的管理和調度,從而共享整個集羣資源、提升資源利用率。
  YARN 整體上也 Master/Slave 架構--ResourceManager/NodeManager。前者(RM)負責對各個 NodeManager(NM) 上的資源進行統一管理和調度。而 Container 是資源分配和調度的基本單位,其中封裝了機器資源,如內存、CPU、磁盤和網絡等,每一個任務會被分配一個 Container,該任務只能在該 Container 中執行,並使用該 Container 封裝的資源。NodeManager 的做用則是負責接收並啓動應用的 Container、而向 RM 回報本節點上的應用 Container 運行狀態和資源使用狀況。ApplicationMaster 與具體的 Application 相關,主要負責同 ResourceManager 協商以獲取合適的 Container,並跟蹤這些 Container 的狀態和監控其進度。以下圖所示爲 yarn 集羣的通常模型。
簡單架構圖以下:
  


詳細架構圖以下:
  

  Spark 在 yarn 集羣上的部署方式分爲兩種,yarn cluster(driver 運行在 master 上)和 yarn client(driver 運行在 client 上)。
  driver on master 以下圖所示:
  


  • (1) Spark Yarn Client 向 YARN 中提交應用程序,包括 Application Master 程序、啓動 Application Master 的命令、須要在 Executor 中運行的程序等。
  • (2) Resource manager 收到請求後,在其中一個 Node Manager 中爲應用程序分配一個 Container,要求它在 Container 中啓動應用程序的 Application Master,Application Master 初始化 sparkContext 以及建立 DAG Scheduler 和 Task Scheduler。
  • (3) Application Master 根據 SparkContext 中的配置,向 Resource Manager 申請 Container,同時,Application Master 向 Resource Manager 註冊,這樣用戶可經過 Resource Manager 查看應用程序的運行狀態。
  • (4) Resource Manager 在集羣中尋找符合條件的 Node Manager,在 Node Manager 啓動 Container,要求 Container 啓動 Executor。
  • (5) Executor 啓動後向 Application Master 註冊,並接收 Application Master 分配的 Task。
  • (6) 應用程序運行完成後,Application Master 向 Resource Manager 申請註銷並關閉本身。
   driver on client 以下圖所示:
  
  • (1) Spark Yarn Client 向 YARN 的 Resource Manager 申請啓動 Application Master。同時在 SparkContent 初始化中將建立 DAG Scheduler 和 Task Scheduler 等。
  • (2) ResourceManager 收到請求後,在集羣中選擇一個 NodeManager,爲該應用程序分配第一個 Container,要求它在這個 Container 中啓動應用程序的 ApplicationMaster,與 YARN-Cluster 區別的是在該 ApplicationMaster 不運行 SparkContext,只與 SparkContext 進行聯繫進行資源的分派。
  • (3) Client 中的 SparkContext 初始化完畢後,與 Application Master 創建通信,向 Resource Manager 註冊,根據任務信息向 Resource Manager 申請資源 (Container)。
  • (4) 當 Application Master 申請到資源後,便與 Node Manager 通訊,要求它啓動 Container。
  • (5) Container 啓動後向 Driver 中的 SparkContext 註冊,並申請 Task。
  • (6) 應用程序運行完成後,Client 的 SparkContext 向 ResourceManage r申請註銷並關閉本身。

 


  Yarn-client 和Yarn cluster 模式對比能夠看出,在 Yarn-client(Driver on client)中,Application Master 僅僅從 Yarn 中申請資源給 Executor,以後 client 會跟 container 通訊進行做業的調度。若是 client 離集羣距離較遠,建議不要採用此方式,不過此方式有利於交互式的做業。  
  


  Spark 可以以集羣的形式運行,可用的集羣管理系統有 Yarn、Mesos 等。集羣管理器的核心功能是資源管理和任務調度。以 Yarn 爲例,Yarn 以 Master/Slave 模式工做,在 Master 節點運行的是 Resource Manager(RM),負責管理整個集羣的資源和資源分配。在 Slave 節點運行的 Node Manager(NM),是集羣中實際擁有資源的工做節點。咱們提交 Job 之後,會將組成 Job 的多個 Task 調度到對應的 Node Manager 上進行執行。另外,在 Node Manager 上將資源以 Container 的形式進行抽象,Container 包括兩種資源 內存 和 CPU。
  如下是一個運行在 Yarn 集羣上,包含一個 Resource Manager 節點,三個 Node Manager 節點(其中,兩個是 Worker 節點,一個 Master 節點)的 Spark 任務調度交換部署架構圖。 

從上面的Spark任務調度過程圖能夠看到:
  1)整個集羣分爲 Master 節點和 Worker 節點,它們都存在於 Node Manager 節點上,在客戶端提交任務時由 Resource Manager 統一分配,運行 Driver 程序的節點被稱爲 Master 節點,執行具體任務的節點被稱爲 Worder 節點。Node Manager 節點上資源的變化都須要及時更新給 Resource Manager,見圖中紅色雙向箭頭。
  2)Master 節點上常駐 Master 守護進程 -- Driver 程序,Driver 程序中會建立 SparkContext對 象,並負責跟各個 Worker 節點上的 ExecutorBackend 進程進行通訊,管理 Worker 節點上的任務,同步任務進度。實際上,在 Yarn 中 Node Manager 之間的關係是平等的,所以 Driver 程序會被調度到任何一個 Node Manager 節點。
  3)每一個 Worker 節點上會存在一個或者多個 ExecutorBackend 進程。每一個進程包含一個 Executor 對象,該對象持有一個線程池,每一個線程池能夠執行一個任務(Task)。ExecutorBackend 進程還負責跟 Master 節點上的 Driver 程序進行通訊,上報任務狀態。  

 

集羣下任務運行過程

  上面的過程反映出了 Spark 在集羣模式下,總體上 Resource Manager 和 Node Manager 節點間的交互,Master 和 Worker 之間的交互。對於一個任務的具體運行過程須要更細緻的分解,分解運行過程見圖中的小字。
  • 1) 用戶經過 bin/spark-submit 部署工具或者 bin/spark-class 向 Yarn 集羣提交應用程序。
  • 2) Yarn 集羣的 Resource Manager 爲提交的應用程序選擇一個 Node Manager 節點並分配第一個 Container,並在該節點的 Container 上啓動 SparkContext 對象。
  • 3) SparkContext 對象向 Yarn 集羣的 Resource Manager 申請資源以運行 Executor。
  • 4) Yarn 集羣的 Resource Manager 分配 Container 給 SparkContext 對象,SparkContext 和相關的 Node Manager 通信,在得到的 Container 上啓動 ExecutorBackend 守護進程,ExecutorBackend 啓動後開始向 SparkContext 註冊並申請 Task。
  • 5) SparkContext 分配 Task 給 ExecutorBackend 執行。
  • 6) ExecutorBackend 開始執行 Task,並及時向 SparkContext 彙報運行情況。Task 運行完畢,SparkContext 歸還資源給 Node Manager,並註銷退。

14.4 mesos 集羣模式

  Mesos 是 apache 下的開源分佈式資源管理框架。起源於加州大學伯克利分校,後被 Twitter 推廣使用。Mesos 上能夠部署多種分佈式框架,Mesos 的架構圖以下圖所示,其中 Framework 是指外部的計算框架,如 Hadoop、Mesos 等,這些計算框架可經過註冊的方式接入 Mesos,以便 Mesos 進行統一管理和資源分配。
  


  在 Mesos 上運行的 Framework 由兩部分組成:一個是 scheduler ,經過註冊到 Master 來獲取集羣資源。另外一個是在 Slave 節點上運行的 executor 進程,它能夠執行 Framework 的 task 。 Master 決定爲每一個 Framework 提供多少資源,Framework 的 scheduler 來選擇其中提供的資源。當 Framework 贊成了提供的資源,它經過 Master 將 task 發送到提供資源的 Slaves 上運行。Mesos c的資源分配圖以下圖所示:
  
  (1) Slave1 向 Master 報告,有 4 個 CPU 和 4 GB 內存可用。
  (2) Master 發送一個 Resource Offer 給 Framework1 來描述 Slave1 有多少可用資源。
  (3) FrameWork1 中的 FW Scheduler 會答覆 Master,我有兩個 Task 須要運行在 Slave1,一個 Task 須要 <2個CPU,1 GB內存="">,另一個 Task 須要 <1個CPU,2 GB內存="">
  (4) 最後,Master 發送這些 Tasks 給 Slave1。而後,Slave1 還有 1 個 CPU 和 1GB 內存沒有使用,因此分配模塊能夠把這些資源提供給 Framework2。

 


  Spark 可做爲其中一個分佈式框架部署在 mesos 上,部署圖與 mesos 的通常框架部署圖相似,以下圖所示,這裏再也不重述。

14.5 spark 三種部署模式的區別

  在這三種部署模式中,standalone 做爲 spark 自帶的分佈式部署模式,是最簡單也是最基本的 spark 應用程序部署模式,這裏就再也不贅述。這裏就講一下 yarn 和 mesos 的區別:
  (1) 就兩種框架自己而言,mesos上可部署 yarn 框架。而 yarn 是更通用的一種部署框架,並且技術較成熟。
  (2) mesos 雙層調度機制,能支持多種調度模式,而 yarn 經過 Resource Mananger 管理集羣資源,只能使用一種調度模式。Mesos 的雙層調度機制爲:mesos 可接入如 yarn 通常的分佈式部署框架,但 Mesos 要求可接入的框架必須有一個調度器模塊,該調度器負責框架內部的任務調度。當一個 Framework 想要接入 mesos 時,須要修改本身的調度器,以便向 mesos 註冊,並獲取 mesos 分配給本身的資源,這樣再由本身的調度器將這些資源分配給框架中的任務,也就是說,整個 mesos 系統採用了雙層調度框架:第一層,由 mesos 將資源分配給框架;第二層,框架本身的調度器將資源分配給本身內部的任務。
  (3) mesos 可實現粗、細粒度資源調度,可動態分配資源,而 yarn 只能實現靜態資源分配。其中粗粒度和細粒度調度定義以下:
  粗粒度模式(Coarse-grained Mode):程序運行以前就要把所須要的各類資源(每一個 executor 佔用多少資源,內部可運行多少個 executor)申請好,運行過程當中不能改變。
  細粒度模式(Fine-grained Mode):爲了防止資源浪費,對資源進行按需分配。與粗粒度模式同樣,應用程序啓動時,先會啓動 executor,但每一個 executor 佔用資源僅僅是本身運行所需的資源,不須要考慮未來要運行的任務,以後,mesos 會爲每一個 executor 動態分配資源,每分配一些,即可以運行一個新任務,單個 Task 運行完以後能夠立刻釋放對應的資源。每一個 Task 會彙報狀態給 Mesos Slave 和 Mesos Master,便於更加細粒度管理和容錯,這種調度模式相似於 MapReduce 調度模式,每一個 Task 徹底獨立,優勢是便於資源控制和隔離,但缺點也很明顯,短做業運行延遲大。
  從 yarn 和 mesos 的區別可看出,它們各自有優缺點。所以實際使用中,選擇哪一種框架,要根據本公司的實際須要而定,可考慮現有的大數據生態環境。如我司採用 yarn 部署 spark,緣由是,我司早已有較成熟的 hadoop 的框架,考慮到使用的方便性,採用了 yarn 模式的部署。

14.6 異常場景分析

上面說明的是正常狀況下,各節點的消息分發細節。那麼若是在運行中,集羣中的某些節點出現了問題,整個集羣是否還可以正常處理 Application 中的任務呢?

14.6.1 異常分析1:Worker 異常退出


在 Spark 運行過程當中,常常碰到的問題就是 Worker 異常退出,當 Worker 退出時,整個集羣會有哪些故事發生呢?請看下面的具體描述:
  1)Worker 異常退出,好比說有意識的經過 kill 指令將 Worker 殺死。
  2)Worker 在退出以前,會將本身所管控的全部小弟 Executor 全乾掉。
  3)Worker 須要按期向 Master 改善心跳消息的,如今 Worker 進程都已經玩完了,哪有心跳消息,因此 Master 會在超時處理中意識到有一個 「分舵」 離開了。
  4)Master 很是傷心,傷心的 Master 將狀況彙報給了相應的 Driver。
Driver 經過兩方面確認分配給本身的 Executor 不幸離開了,一是 Master 發送過來的通知,二是 Driver 沒有在規定時間內收到 Executor 的 StatusUpdate,因而 Driver 會將註冊的 Executor 移除。

後果分析
Worker 異常退出會帶來哪些影響:
  1)Executor 退出致使提交的 Task 沒法正常結束,會被再一次提交運行。
  2)若是全部的 Worker 都異常退出,則整個集羣不可用。
  3)須要有相應的程序來重啓 Worker 進程,好比使用 supervisord 或 runit。

測試步驟
  1)啓動 Master。
  2)啓動 Worker。
  3)啓動 spark-shell。
  4)手工 kill 掉 Worker 進程。
  5)用 jps 或 ps -ef | grep -i java 來查看啓動着的 java 進程。

異常退出的代碼處理
定義 ExecutorRunner.scala 的 start 函數

def start() {
  workerThread = new Thread("ExecutorRunner for " + fullId) {
    override def run() { fetchAndRunExecutor() }
  }
  workerThread.start()
  // Shutdown hook that kills actors on shutdown.
  shutdownHook = new Thread() {
    override def run() {
      killProcess(Some("Worker shutting down"))
    }
  }
  Runtime.getRuntime.addShutdownHook(shutdownHook)
}

killProcess 的過程就是中止相應 CoarseGrainedExecutorBackend 的過程。
Worker 中止的時候,必定要先將本身啓動的 Executor 中止掉。這是否是很像水滸中宋江的手段,李逵就是這樣不明不白的把命給丟了。

小結
  須要特別指出的是,當 Worker 在啓動 Executor 的時候,是經過 ExecutorRunner 來完成的,ExecutorRunner 是一個獨立的線程,和 Executor 是一對一的關係,這很重要。Executor 做爲一個獨立的進程在運行,但會受到 ExecutorRunner 的嚴密監控。

14.6.2 異常分析2:Executor 異常退出

後果分析
Executor 做爲 Standalone 集羣部署方式下的最底層員工,一旦異常退出,其後果會是什麼呢?
  1)Executor 異常退出,ExecutorRunner 注意到異常,將狀況經過 ExecutorStateChanged 彙報給 Master。
  2)Master 收到通知以後,很是不高興,盡然有小弟要跑路,那還了得,要求 Executor 所屬的 Worker 再次啓動。
  3)Worker 收到 LaunchExecutor 指令,再次啓動 Executor。

測試步驟
  1)啓動 Master
  2)啓動 Worker
  3)啓動 spark-shell
  4)手工 kill 掉 CoarseGrainedExecutorBackend

fetchAndRunExecutor
fetchAndRunExecutor 負責啓動具體的 Executor,並監控其運行狀態,具體代碼邏輯以下所示

def fetchAndRunExecutor() {
  try {
    // Create the executor's working directory
    val executorDir = new File(workDir, appId + "/" + execId)
    if (!executorDir.mkdirs()) {
      throw new IOException("Failed to create directory " + executorDir)
    }

    // Launch the process
    val command = getCommandSeq
    logInfo("Launch command: " + command.mkString("\"""\" \"""\""))
    val builder 
new ProcessBuilder(command: _*).directory(executorDir)
    val env = builder.environment()
    for ((key, value)  {
      logInfo("Runner thread for executor " + fullId + " interrupted")
      state = ExecutorState.KILLED
      killProcess(None)
    }
    case e: Exception 
=> {
      logError("Error running executor", e)
      state = ExecutorState.FAILED
      killProcess(Some(e.toString))
    }
  }
}

14.6.3 異常分析3:Master 異常退出


Worker 和 Executor 異常退出的場景都講到了,咱們剩下最後一種狀況了,Master 掛掉了怎麼辦?

後果分析
帶頭大哥若是不在了,會是什麼後果呢?
  1)Worker 沒有彙報的對象了,也就是若是 Executor 再次跑飛,Worker 是不會將 Executor 啓動起來的,大哥沒給指令。
  2)沒法向集羣提交新的任務。
  3)老的任務即使結束了,佔用的資源也沒法清除,由於資源清除的指令是 Master 發出的。

第15章 wordcount 程序運行原理窺探

15.1 spark 之 scala 實現 wordcount

在 spark 中使用 scala 來實現 wordcount(統計單詞出現次數模型)更加簡單,相對 java 代碼上更加簡潔,其函數式編程的思惟邏輯也更加直觀。

package com.spark.firstApp

import org.apache.spark.{SparkContext, SparkConf}

/**
  * scala 實現 wordcount
  */

object WordCount1 {
  def main(args: Array[String]) {
    if (args.length == 0) {
      System.err.println("Usage: WordCount1 <file1>")
      System.exit(1)
    }
    /**
      * 一、實例化 SparkConf
      * 二、構建 SparkContext,SparkContext 是 spark 應用程序的惟一入口
      * 3. 經過 SparkContext 的 textFile 方法讀取文本文件
      */

    val conf = new SparkConf().setAppName("WordCount1").setMaster("local")
    val sc = new SparkContext(conf)

    /**
      * 四、經過 flatMap 對文本中每一行的單詞進行拆分(分割符號爲空格),並利用 map 進行函數轉換造成 (K,V) 對形式,再進行 reduceByKey,打印輸出 10 個結果
      *    函數式編程更加直觀的反映思惟邏輯
      */

    sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).take(10).foreach(println)
    sc.stop()
  }
}

15.2 原理窺探

在 spark 集羣中運行 wordcount 程序其主要業務邏輯比較簡單,涵蓋一下 3 個過程:
  1)讀取存儲介質上的文本文件(通常存儲在 hdfs 上);
  2)對文本文件內容進行解析,按照單詞進行分組統計彙總;
  3)將過程 2 的分組結果保存到存儲介質上。(通常存儲在 hdfs 或者 RMDB 上)
雖然 wordcount 的業務邏輯很是簡單,但其應用程序在 spark 中的運行過程卻巧妙得體現了 spark 的核心精髓--分佈式彈性數據集內存迭代以及函數式編程等特色。下圖對 spark 集羣中 wordcount 的運行過程進行剖析,加深對 spark 技術原理窺探。

  該圖橫向分割下面給出了 wordcount 的 scala 核心程序實現,該程序在 spark 集羣的運行過程涉及幾個核心的 RDD,主要有 textFileRDD、flatMapRDD、mapToPairRDD、shuffleRDD(reduceByKey)等。   應用程序經過 textFile 方法讀取 hdfs 上的文本文件,數據分片的形式以 RDD 爲統一模式將數據加載到不一樣的物理節點上,如上圖所示的節點 一、節點 2 到節點 n;並經過一系列的數據轉換,如利用 flatMap 將文本文件中對應每行數據進行拆分(文本文件中單詞以空格爲分割符號),造成一個以每一個單詞爲核心新的數據集合 RDD;以後經過 MapRDD 繼續轉換造成造成 (K,V) 對數據形式,以便進一步使用 reduceByKey 方法,該方法會觸發 shuffle 行爲,促使不一樣的單詞到對應的節點上進行匯聚統計(實際上在跨節點進行數據 shuffle 以前會在本地先對相同單詞進行合併累加),造成 wordcount 的統計結果;最終經過 saveAsTextFile 方法將數據保存到 hdfs 上。具體的運行邏輯原理以及過程上圖給出了詳細的示意說明。
相關文章
相關標籤/搜索