spark 源碼分析之十八 -- Spark存儲體系剖析

本篇文章主要剖析BlockManager相關的類以及總結Spark底層存儲體系。html

總述

先看 BlockManager相關類之間的關係以下:java

 

咱們從NettyRpcEnv 開始,作一下簡單說明。node

NettyRpcEnv是Spark 的默認的RpcEnv實現,它提供了個Spark 集羣各個節點的底層通訊環境,能夠參照文章 spark 源碼分析之十二--Spark RPC剖析之Spark RPC總結 作深刻了解。緩存

MemoryManager 主要負責Spark內存管理,能夠參照 spark 源碼分析之十五 -- Spark內存管理剖析作深刻了解。安全

MemoryStore 主要負責Spark單節點的內存存儲,能夠參照 spark 源碼分析之十六 -- Spark內存存儲剖析 作深刻了解。網絡

DiskStore 主要負責Spark單節點的磁盤存儲,能夠參照 spark 源碼分析之十七 -- Spark磁盤存儲剖析 作深刻了解。app

SecurityManager 主要負責底層通訊的安全認證。框架

BlockManagerMaster 主要負責在executor端和driver的通訊,封裝了 driver的RpcEndpointRef。dom

NettyBlockTransferService 使用netty來獲取一組數據塊。異步

MapOutputTracker 是一個跟蹤 stage 的map 輸出位置的類,driver 和 executor 有對應的實現,分別是 MapOutputTrackerMaster 和 MapOutputTrackerWorker。

ShuffleManager在SparkEnv中初始化,它在driver端和executor端都有,負責driver端生成shuffle以及executor的數據讀寫。

BlockManager 是Spark存儲體系裏面的核心類,它運行在每個節點上(drievr或executor),提供寫或讀本地或遠程的block到各類各樣的存儲介質中,包括磁盤、堆內內存、堆外內存。 

 

下面咱們剖析一下以前沒有剖析過,圖中有的類:

SecurityManager

概述

類說明以下:

Spark class responsible for security. In general this class should be instantiated by the SparkEnv and most components should access it from that. 
There are some cases where the SparkEnv hasn't been initialized yet and this class must be instantiated directly.
This class implements all of the configuration related to security features described in the "Security" document.
Please refer to that document for specific features implemented here.

 

這個類主要就是負責Spark的安全的。它是由SparkEnv初始化的。

類結構

其結構以下:

成員變量

WILDCARD_ACL:常量爲*,表示容許全部的組或用戶擁有查看或修改的權限。

authOn:表示網絡傳輸是否啓用安全,由參數 spark.authenticate控制,默認爲 false。

aclsOn:表示,由參數 spark.acls.enable 或 spark.ui.acls.enable 控制,默認爲 false。

adminAcls:管理員權限,由 spark.admin.acls 參數控制,默認爲空字符串。

adminAclsGroups:管理員所在組權限,由 spark.admin.acls.groups 參數控制,默認爲空字符串。

viewAcls:查看控制訪問列表用戶。

viewAclsGroups:查看控制訪問列表用戶組。

modifyAcls:修改控制訪問列表用戶。

modifyAclsGroups:修改控制訪問列表用戶組。

defaultAclUsers:默認控制訪問列表用戶。由user.name 參數和 SPARK_USER環境變量一塊兒設置。

secretKey:安全密鑰。

hadoopConf:hadoop的配置對象。

defaultSSLOptions:默認安全選項,以下:

其中SSLOption的parse 方法以下,主要用於一些安全配置的加載:

defaultSSLOptions跟getSSLOptions方法搭配使用:

核心方法

1. 設置獲取 adminAcls、viewAclsGroups、modifyAcls、modifyAclsGroups變量的方法,比較簡單,再也不說明。

2. 檢查UI查看的權限以及修改權限:

3. 獲取安全密鑰:

4. 獲取安全用戶:

5. 初始化安全:

 

總結

這個類主要是用於Spark安全的,主要包含了權限的設置和獲取的方法,密鑰的獲取、安全用戶的獲取、權限驗證等功能。

下面來看一下BlockManagerMaster類。

BlockManagerMaster

概述

BlockManagerMaster 這個類是對 driver的 EndpointRef 的包裝,能夠說是 driver EndpointRef的一個代理類,在請求訪問driver的時候,調用driver的EndpointRef的對應方法,並處理其返回。

類結構 

其類結構以下:

主要是一些經過driver獲取的節點或block、或BlockManager信息的功能函數。

成員變量

driverEndpoint是一個EndpointRef 對象,能夠指本地的driver 的endpoint 或者是遠程的 endpoint引用,經過它既能夠和本地的driver進行通訊,也能夠和遠程的driver endpoint 進行通訊。

timeout 是指的 Spark RPC 超時時間,默認爲 120s,能夠經過spark.rpc.askTimeout 或 spark.network.timeout 參數來設置。

核心方法:

1. 移除executor,有同步和異步兩種方案,這兩個方法只會在driver端使用。以下:

2. 向driver註冊blockmanager

3. 更新block信息

4. 向driver請求獲取block對應的 location信息

 

5. 向driver 請求得到集羣中全部的 blockManager的信息

4. 向driver 請求executor endpoint ref 對象

5. 移除block、RDD、shuffle、broadcast

 

6. 向driver 請求獲取每個BlockManager內存狀態

7. 向driver請求獲取磁盤狀態

8. 向driver請求獲取block狀態

9. 是否有匹配的block

 

10.檢查是否緩存了block

其依賴方法tell 方法以下:

總結

BlockManagerMaster 主要負責和driver的交互,來獲取跟底層存儲相關的信息。

ShuffleClient

類說明

它定義了從executor或者是外部服務讀取shuffle數據的接口。

核心方法

1. init方法用於初始化ShuffleClient,須要指定executor 的appId

2. fetchBlocks 用於異步從另外一個節點請求獲取blocks,參數解釋以下:

host – the host of the remote node.
port – the port of the remote node.
execId – the executor id.
blockIds – block ids to fetch.
listener – the listener to receive block fetching status.
downloadFileManager – DownloadFileManager to create and clean temp files. If it's not null, the remote blocks will be streamed into temp shuffle files to reduce the memory usage, otherwise, they will be kept in memory.

3. shuffleMetrics 用於記錄shuffle相關的metrics信息

BlockTransferService

類說明

它是ShuffleClient的子類。它是ShuffleClient的抽象實現類,定義了讀取shuffle的基礎框架。

核心方法

init 方法,它額外提供了使用BlockDataManager初始化的方法,方便從本地獲取block或者將block存入本地。

close:關閉ShuffleClient

port:服務正在監聽的端口

hostname:服務正在監聽的hostname

fetchBlocks 跟繼承類同樣,沒有實現,因爲繼承關係能夠不寫。

uploadBlocks:上傳block到遠程節點,返回一個future對象

fetchBlockSync:同步抓取遠程節點的block,直到block數據獲取成功才返回,以下:

它定義了block 抓取後,對返回結果處理的基本框架。

 uploadBlockSync 方法:同步上傳信息,直到上傳成功才結束。以下:

ManagedBuffer的三個子類

在 spark 源碼分析之十七 -- Spark磁盤存儲剖析 中已經說起過ManagedBuffer類。

下面看一下ManagedBuffler的三個子類:FileSegmentManagedBuffer、EncryptedManagedBuffer、NioManagedBuffer

FileSegmentManagedBuffer:由文件中的段支持的ManagedBuffer。

EncryptedManagedBuffer:由加密文件中的段支持的ManagedBuffer。

NioManagedBuffer:由ByteBuffer支持的ManagedBuffer。

NettyBlockTransferService

類說明:

它是BlockTransferService,使用netty來一次性獲取shuffle的block數據。

成員變量

hostname:TransportServer 監聽的hostname

serializer:JavaSerializer 實例,用於序列化反序列化java對象。

authEnabled:是否啓用安全

transportConf:TransportConf 對象,主要是用於初始化shuffle的線程數等配置。,spark.shuffle.io.serverThreads 和 spark.shuffle.io.clientThreads,默認是線程數在 [1,8] 個,這跟可用core的數量和指定core數量有關。 這兩個參數決定了底層netty server端和client 端的線程數。

transportContext:TransportContext 用於建立TransportServer和TransportClient的上下文。

server:TransportServer對象,是Netty的server端線程。

clientFactory:TransportClientFactory 用於建立TransportClient

appId:application id,由 spark.app.id 參數指定

核心方法

1. init 方法主要用於初始化底層netty的server和client,以下:

關於底層RPC部分的內容,在Spark RPC 剖析系列已經作過說明,參照 spark 源碼分析之十二--Spark RPC剖析之Spark RPC總結 作進一步瞭解。

2. 關閉ShuffleClient:

3. 上傳數據:

config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM 是由spark.maxRemoteBlockSizeFetchToMem參數決定的,默認是 整數最大值 - 512.

因此整數範圍內的block數據,是由 netty RPC來處理的,128MB顯然是在整數範圍內的,因此hdfs上的block 數據spark都是經過netty rpc來通訊傳輸的。

 

4. 從遠程節點獲取block數據,源碼以下:

首先數據抓取是能夠支持重試的,重試次數默認是3次,能夠由參數 spark.shuffle.io.maxRetries 指定,其實是由OneForOneBlockFetcher來遠程抓取數據的。

重試抓取遠程block機制的設計

當重試次數不大於0時,直接使用的是BlockFetchStarter來生成 OneForOneBlockFetcher 抓取數據。

當次數大於0 時,則使用 RetryingBlockFetcher 來重試式抓取數據。

 

先來看一下其成員變量:

executorService: 用於等待執行重試任務的共享線程池

fetchStarter:初始化 OneForOneBlockFetcher 對象

listener:監聽抓取block成功或失敗的listener

maxRetries;最大重試次數。

retryWaitTime:下一次重試間隔時間。能夠經過 spark.shuffle.io.retryWait參數設置,默認是 5s。

retryCount:已重試次數。

outstandingBlocksIds:剩餘須要抓取的blockId集合。

currentListener:它只監聽當前fetcher的返回。

 

核心方法:

思路:首先,初始化須要抓取的blockId列表,已重試次數,以及currentListener。而後去調用fetcherStarter開始抓取任務,每個block抓取成功後,都會調用currentListener對應成功方法,失敗則會調用 currentListener 失敗方法。在fetch過程當中數據有異常出現,則先判斷是否須要重試,若需重試,則初始化重試,將wait和fetch任務放到共享線程池中去執行。

下面看一下,相關方法和類:

1. RetryingBlockFetchListener 類。它有兩個方法,一個是抓取成功的回調,一個是抓取失敗的回調。

在抓取成功回調中,會先判斷當前的currentListener是不是它自己,而且返回的blockId在須要抓取的blockId列表中,若兩個條件都知足,則會從須要抓取的blockId列表中把該blockId移除而且去調用listener相對應的抓取成功方法。

在抓取失敗回調中,會先判斷當前的currentListener是不是它自己,而且返回的blockId在須要抓取的blockId列表中,若兩個條件都知足,再判斷是否須要重試,如需重試則重置重試機制,不然直接調用listener的抓取失敗方法。

 

2. 是否須要重試:

思路:若是是IO 異常而且還有剩餘重試次數,則重試。

3.  初始化重試:

總結:該重試的blockFetcher 引入了中間層,即自定義的RetryingBlockFetchListener 監聽器,來完成重試或事件的傳播機制(即調用原來的監聽器的抓取失敗成功對應方法)以及須要抓取的blockId列表的更新,重試次數的更新等操做。

MapOutputTracker

類說明

MapOutputTracker 是一個定位跟蹤 stage 的map 輸出位置的類,driver 和 executor 有對應的實現,分別是 MapOutputTrackerMaster 和 MapOutputTrackerWorker。

其類結構以下:

成員變量

trackerEndpoint:它是一個EndpointRef對象,是driver端 MapOutputTrackerMasterEndpoint 的在executor的代理對象。

epoch:The driver-side counter is incremented every time that a map output is lost. This value is sent to executors as part of tasks, where executors compare the new epoch number to the highest epoch number that they received in the past. If the new epoch number is higher then executors will clear their local caches of map output statuses and will re-fetch (possibly updated) statuses from the driver.

eposhLock: 一個鎖對象

核心方法

1. 向driver端trackerEndpoint 發送消息

2. excutor 獲取每個shuffle中task 須要讀取的範圍的 block信息,partition範圍包頭不包尾。

3. 刪除指定的shuffle的狀態信息

4. 中止服務

其子類MapOutputTrackerMaster 和 MapOutputTrackerWorker在後續shuffle 剖許再做進一步說明。 

ShuffleManager

類說明

它是一個可插拔的shuffle系統,ShuffleManager 在driver和每個executor的SparkEnv中基於spark.shuffle.manager參數建立,driver使用這個類來註冊shuffle,executor或driver本地任務能夠請求ShuffleManager 來讀寫任務。

類結構

1. registerShuffle:Register a shuffle with the manager and obtain a handle for it to pass to tasks.

2. getWriter:Get a writer for a given partition. Called on executors by map tasks.

3. getReader:Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). Called on executors by reduce tasks.

4. unregisterShuffle:Remove a shuffle's metadata from the ShuffleManager.

5. shuffleBlockResolver:Return a resolver capable of retrieving shuffle block data based on block coordinates.

6. stop:Shut down this ShuffleManager.

其有惟一子類 SortShuffleManager,咱們在剖析spark shuffle 過程時,再作進一步說明。 

 

下面,咱們來看Spark存儲體系裏面的重頭戲 -- BlockManager

BlockManager

類說明

Manager running on every node (driver and executors) which provides interfaces for putting and retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). 
Note that initialize( ) must be called before the BlockManager is usable.

 

它運行在每個節點上(drievr或executor),提供寫或讀本地或遠程的block到各類各樣的存儲介質中,包括磁盤、堆內內存、堆外內存。

構造方法

其中涉及的變量,以前基本上都已做說明,再也不說明。

這個類結構很是龐大,再也不展現類結構圖。下面分別對其成員變量和比較重要的方法作一下說明。

成員變量

externalShuffleServiceEnabled: 是否啓用外部shuffle 服務,經過spark.shuffle.service.enabled 參數配置,默認是false

remoteReadNioBufferConversion:是否 xxxxx, 經過 spark.network.remoteReadNioBufferConversion 參數配置,默認是 false

diskBlockManager:DiskBlockManager對象,用於管理block和物理block文件的映射關係的

blockInfoManager:BlockInfoManager對象,Block讀寫鎖

futureExecutionContext:ExecutionContextExecutorService 內部封裝了一個線程池,線程前綴爲 block-manager-future,最大線程數是 128

memoryStore:MemoryStore 對象,用於內存存儲。

diskStore:DiskStore對象,用於磁盤存儲。

maxOnHeapMemory:最大堆內內存

maxOffHeapMemory:最大堆外內存

externalShuffleServicePort: 外部shuffle 服務端口,經過 spark.shuffle.service.port 參數設置,默認爲 7337

blockManagerId:BlockManagerId 對象是blockManager的惟一標識

shuffleServerId:BlockManagerId 對象,提供shuffle服務的BlockManager的惟一標識

shuffleClient:若是啓用了外部存儲,即externalShuffleServiceEnabled爲true,使用ExternalShuffleClient,不然使用經過構造參數傳過來的 blockTransferService 對象。

maxFailuresBeforeLocationRefresh:下次從driver刷新block location時須要重試的最大次數。經過spark.block.failures.beforeLocationRefresh 參數來設置,默認時 5

slaveEndpoint:BlockManagerSlaveEndpoint的ref對象,負責監聽處理master的請求。

asyncReregisterTask:異步註冊任務

asyncReregisterLock:鎖對象

cachedPeers:Spark集羣中全部的BlockManager

peerFetchLock:鎖對象,用於獲取spark 集羣中全部的blockManager時用

lastPeerFetchTime:最近獲取spark 集羣中全部blockManager的時間

blockReplicationPolicy:BlockReplicationPolicy 對象,它有兩個子類 BasicBlockReplicationPolicy 和 RandomBlockReplicationPolicy。

remoteBlockTempFileManager:RemoteBlockDownloadFileManager 對象

maxRemoteBlockToMem:經過 spark.maxRemoteBlockSizeFetchToMem 參數控制,默認爲整數最大值 - 512

核心方法[簡版]

注:未作過多的分析,大部份內容在以前內存存儲和磁盤存儲中都已涉及。

1. 初始化方法

思路:初始化 blockReplicationPolicy, 能夠經過參數 spark.storage.replication.policy  來指定,默認爲 RandomBlockReplicationPolicy;初始化BlockManagerId並想driver註冊該BlockManager;初始化shuffleServerId

 

2. 從新想driver註冊blockManager方法:

思路: 經過 BlockManagerMaster 想driver 註冊 BlockManager

 

3. 獲取block數據,以下:

其依賴方法 getLocalBytes 以下,思路:若是是shuffle的數據,則經過shuffleBlockResolver獲取block信息,不然使用BlockInfoManager加讀鎖後,獲取數據。

doGetLocalBytes 方法以下,思路:按照是否須要反序列化、是否保存在磁盤中,作相應處理,操做直接依賴與MemoryStore和DiskStore。

 

4. 存儲block數據,直接調用putBytes 方法:

其依賴方法以下,直接調用doPutBytes 方法:

doPutBytes 方法以下:

doPut 方法以下,思路,加寫鎖,執行putBody方法:

 

5. 保存序列化以後的字節數據

6. 保存java對象:

7. 緩存讀取的數據在內存中:

 

8. 獲取Saprk 集羣中其餘的BlockManager信息:

9. 同步block到其餘的replicas:

其依賴方法以下:

10.把block從內存中驅逐:

11. 移除block:

12. 中止方法

 

BlockManager 主要提供寫或讀本地或遠程的block到各類各樣的存儲介質中,包括磁盤、堆內內存、堆外內存。獲取Spark 集羣的BlockManager的信息、驅逐內存中block等等方法。

其遠程交互依賴於底層的netty模塊。有不少的關於存儲的方法都依賴於MemoryStore和DiskStore的實現,再也不作一一解釋。

總結

本篇文章介紹了Spark存儲體系的最後部份內容。行文有些倉促,有一些類可能會漏掉,但對於理解Spark 存儲體系已經綽綽有餘。本地存儲依賴於MemoryStore和DiskStore,遠程調用依賴於NettyBlockTransferService、BlockManagerMaster、MapOutputTracker等,其底層絕大多數依賴於netty與driver或其餘executor通訊。

Spark shuffle、broadcast等也是依賴於存儲系統的。接下來將進入spark的核心部分,去探索Spark底層的RDD是如何構建Stage做業以及每個做業是如何工做的。

相關文章
相關標籤/搜索