轉載請註明出處:http://www.cnblogs.com/BYRans/
算法
RDD的存放和管理都是由Spark的存儲管理模塊實現和管理的。本文從架構和功能兩個角度對Spark的存儲管理模塊進行介紹。緩存
從架構角度,存儲管理模塊主要分爲如下兩層:網絡
在存儲管理模塊的通訊層,每一個Executor上的BlockManager只負責管理其自身Executor所擁有的數據塊原信息,而不會管理其餘Executor上的數據塊元信息;而Driver端的BlockManager擁有全部已註冊的BlockManager信息和數據塊元信息,所以Executor的BlockManager每每是經過向Driver發送信息來得到所須要的非本地數據的。架構
RDD是由不一樣的分區組成的,咱們所進行的轉換和執行操做都是在每一塊獨立的分區上各自進行的。而在存儲管理模塊內部,RDD又被視爲由不一樣的數據塊組成,對於RDD的存取是以數據塊爲單位的,本質上分區(partition)和數據塊(block)是等價的,只是看待的角度不一樣。同時,在Spark存儲管理模塊中存取數據的最小單位是數據塊,全部的操做都是以數據塊爲單位的。框架
前面章節已經提到:存儲管理模塊以數據塊爲單位進行數據管理,數據塊是存儲管理模塊中最小的操做單位。在存儲管理模塊中管理着各類不一樣的數據塊,這些數據塊爲Spark框架提供了不一樣的功能,Spark存儲管理模塊中所管理的幾種主要數據塊爲:socket
從功能角度,存儲管理模塊能夠分爲如下兩個主要部分:函數
存儲管理模塊能夠分爲兩大塊,一是RDD的緩存,二是Shuffle數據的持久化。接下來將介紹存儲管理模塊如何從內存和磁盤兩個方面對RDD進行緩存。性能
對於RDD的各類操做,如轉換操做、執行操做,咱們將操做函數施行於RDD之上,而最終這些操做都將施行於每個分區之上,所以能夠這麼說,在RDD上的全部運算都是基於分區的。而在存儲管理模塊內,咱們所接觸到的每每是數據塊這個概念,在存儲管理模塊中對於數據的存取都是以數據塊爲單位進行的。分區是一個邏輯上的概念,而數據塊是物理上的數據實體,咱們操做的分區和數據塊,它們二者之間有什麼關係呢?本小節咱們將介紹分區與數據塊的關係。spa
在Spark中,分區和數據塊是一一對應的,一個RDD中的一個分區對應着存儲管理模塊中的一個數據塊,存儲管理模塊接觸不到也並不關心RDD,它只關心數據塊,對於數據塊和分區之間的映射則是經過名稱上的約定進行的。.net
這種名稱上的約定是按以下方式創建的:Spark爲每個RDD在其內部維護了獨立的ID號,同時,對於RDD的每個分區也有一個獨立的索引號,所以只要知道ID號和索引號咱們就能找到RDD中的相應分區,也就是說「ID號+索引號」就能全局惟一地肯定這個分區。這樣以「ID號+索引號」做爲塊的名稱就天然地創建起了分區和塊的映射。
在顯示調用調用函數緩存咱們所需的RDD時,Spark在其內部就創建了RDD分區和數據塊之間的映射,而當咱們須要讀取緩存的RDD時,根據上面所提到的映射關係,就能從存儲管理模塊中取得分區對應的數據塊。下圖展現了RDD分區與數據塊之間的映射關係。
當以默認或者基於內存的持久化方式緩存RDD時,RDD中的每一分區所對應的數據塊是會被存儲管理模塊中的內存緩存(Memory Store)所管理的。內存緩存在其內部維護了一個以數據塊名爲鍵,塊內容爲值的哈希表。
在內存緩存中有一個重要的問題是,當內存不是或是已經到達所設置的閾值時應如何處理。在Spark中對於內存緩存可以使用的內存閾值有這樣一個配置:spark.storage.memoryFraction。默認狀況下是0.6,也就是說JVM內存的60%可被內存緩存用來存儲塊內容。當咱們存儲的數據塊所佔用的內存大於60%時,Spark會採起一些策略釋放內存緩存空間:丟棄一些數據塊,或是將一些數據塊存儲到磁盤上以釋放內存緩存空間。是丟棄仍是存儲到磁盤上,依賴於進行操做的這些數據塊的持久化選項,若持久化選項中包含了磁盤緩存,則會將這些塊已入磁盤進行緩存,反之則直接刪除。
那麼直接刪除是否會影響Spark程序的錯誤恢復機制呢?這取決於依賴關係的可回溯性,若該RDD所依賴的祖先RDD是可被回溯並可用的,那麼該RDD所對應的塊被刪除是不會影響錯誤恢復的。反之,若該RDD已是祖先RDD,且數據已沒法被回溯到,那麼程序就會出錯。lost executor錯誤是否是就是這個緣由?
從上面的介紹能夠看出,內存緩存對於數據塊的管理是很是簡單的,本質上就是一個哈希表加上一些存取策略。
磁盤緩存管理數據塊的方式爲,首先,這些數據塊會被存放到磁盤中的特定目錄下。當咱們配置spark.local.dir時,咱們就配置了存儲管理模塊磁盤緩存存放數據的目錄。磁盤緩存初始化時會在這些目錄下建立Spark磁盤緩存文件夾,文件夾的命名方式是:spark-local-yyyyMMddHHmmss-xxxx,其中xxxx是一隨機數。伺候全部的塊內容都將存儲到這些建立的目錄中。
其次,在磁盤緩存中,一個數據塊對應着文件系統中的一個文件,文件名和塊名稱的映射關係是經過哈希算法計算所得的。
總而言之,數據塊對應的文件路徑爲:dirId/subDirId/block_id。這樣咱們就創建了塊和文件之間的對應關係,而存取塊內容就變成了寫入和讀取相應的文件了。
被緩存的數據塊是可容錯恢復的,若RDD的某一分區丟失,他會經過繼承關係自動從新得到。
對於RDD的持久化,Spark爲咱們提供了不一樣的選項,使咱們能將RDD持久化到內存、磁盤,或是以序列化的方式持久化到內存中,設置能夠在集羣的不一樣節點之間存儲多份拷貝。全部這些不一樣的存儲策略都是經過不一樣的持久化選項來決定的。
存儲管理模塊能夠分爲兩大塊,一是RDD的緩存,二是Shuffle數據的持久化。介紹完RDD緩存,接下來介紹Shuffle數據持久化。
下圖爲Spark中Shuffle操做的流程示意圖
首先,每個Map任務會根據Reduce任務的數據量建立出相應的桶,桶的數量是M*R,其中M是Map任務的個數,R是Reduce任務的個數。
其次,Map任務產生的結果會根據所設置的分區算法填充到每一個桶中。這裏的分區算法是可自定義的,固然默認的算法是根據鍵哈希到不一樣的桶中。
當Reduce任務啓動時,它會根據本身任務的ID和所依賴的Map任務的ID從遠端或本地的存儲管理模塊中取得相應的桶做爲任務的輸入進行處理。
Shuffle數據與RDD持久化的不一樣之處在於:
默認的方式會產生大量的文件,如1000個Map任務和1000個Reduce任務,會產生1000000個Shuffle文件,這會對磁盤和文件系統的性能形成極大的影響,所以有了第二種是實現方式,將分時運行的Map任務所產生的Shuffle數據塊合併到同一個文件中,以減小Shuffle文件的總數。對於第二種存儲方式,示意圖以下:
前面介紹了Shuffle數據塊的存取,下面咱們來介紹Shuffle數據塊的讀取和傳輸。Shuffle是將一組任務的輸出結果從新組合做爲下一組任務的輸入這樣的一個過程,因爲任務分佈在不一樣的節點上,所以爲了將重組結果做爲輸入,必然涉及Shuffle數據的讀取和傳輸。
在Spark存儲管理模塊中,Shuffle數據的讀取和傳輸有兩種方式:
前者是默認的獲取方式,經過配置spark.shuffle.use.netty爲true,能夠啓用第二種獲取方式。之因此有兩種Shuffle數據的獲取方式,是由於默認的方式在一些狀況下沒法充分利用網絡帶寬,用戶能夠經過比較兩種方式在性能上的差別來自行決定選用哪一種Shuffle數據獲取方式。
總的來講,Spark存儲管理模塊爲Shuffle數據的持久化作了許多有別於RDD持久化的工做,包括存取Shuffle數據塊的方式,以及讀取和傳輸Shuffle數據塊的方式,全部這些實現都是爲了使Shuffle得到更好的性能和容錯。