Spark BlockManager的通訊及內存佔用分析(源碼閱讀九)

  以前閱讀也有總結過Block的RPC服務是經過NettyBlockRpcServer提供打開,即下載Block文件的功能。而後在啓動jbo的時候由Driver上的BlockManagerMaster對存在於Executor上的BlockManager統一管理,註冊Executor的BlockManager、更新Executor上Block的最新信息、詢問所須要Block目前所在的位置以及當Executor運行結束時,將Executor移除等等。那麼Driver與Executor之間是怎麼交互的呢?數組

  在Spark1.6時,Drvier的BlockManagerMaster與BlockManager之間的通訊,再也不是經過AkkaUtil,而是用了RpcEndpoint,也就木有了BlockManagerMasterActor,而是BlockManagerMasterEndpoint:緩存

  

  BlockManagerMaster與BlockManager之間的通訊已經使用RPC遠程過程調用來實現,RPC相關配置參數以下:安全

  spark.rpc.retry.wait 3s(默認)等待時長 、 spark.rpc.numRetries 3(默認)重試次數、spark.rpc.askTimeout 120s(默認)請求時長、spark.rpc.lookupTimeoutspark.network.timeout 120s(默認)查找時長,是要一塊兒配置。源碼分析

  好的,咱們繼續,每一個executor中的BlockManager的建立,都要通過BlockManagerMaster註冊BlockManagerId.post

  

  Executor或Driver自身的BlockMnager在初始化時,須要向Driver的BlockManager註冊BlockMnager信息,註冊的消息內容包括BlockMnagerI的d時間戳最大內存、以及slaveEndpoint。帶有slaveEndpoint的目的是爲了便於接收BlockManagerMaster回覆的消息,在register方法執行結束後向發送者BlockManageMaster發送一個簡單的消息true.優化

    

  register方法確保blockManagerInfo持有消息中的blockManagerId及對應消息,而且確保每一個Executor最多隻能有一個blockManagerId,舊的blockManagerId會被移除。最後向listenerBus中post(推送)一個sparkListenerBlockManagerAdded事件。spa

  那麼下來,開始磁盤管理器DiskBlockManager的構造:.net

  

  咱們能夠看到BlcokManager初始化時,建立DiskBlockManager,在建立時,調用了createLocalDirs方法建立本地文件目錄,而後建立了二維數組subDirs,用來緩存一級目錄localDirs及二級目錄,其中二級目錄的數量根據配置spark.diskStore.subDirectories獲取,默認爲64.那麼爲何DisBlockManager要建立二級目錄?由於二級目錄用於對文件進行散列存儲,散列存儲可使全部文件都隨機存放,寫入或刪除文件更方便,存取速度快,節省空間。那麼咱們再細化看下這個磁盤路徑是怎麼配置的,從哪裏來的?線程

  

  從圖中能夠看到,這個路徑來源於spark.local.dir,可是呢,若是是spark on yarn模式,那麼真正的路徑是由yarn的配置參數決定的,參數爲YARN_LOCAL_DIRS3d

  接下來查閱源碼還會發現有個addShutdownHock()方法,它是幹什麼的呢,它是用來添加運行時環境結束時,在進程關閉的時候建立線程,經過調用Disk-BlockMnager的stop方法,清除一些臨時目錄:

    下來咱們來探索下,是如何獲取磁盤文件的?

    

  首先咱們能夠看到,nonNegativeHash方法,該方法用來根據文件名計算哈希值。而後根據哈希值與本地文件以及目錄的總數求餘數,記爲dirId。隨後又根據哈希值與本地文件一級目錄的總數求商數,此商數與二級目錄的數目再求餘數,記爲subDirId.那麼若是dirId/subDirId目錄存在,則獲取dirId/subDirId目錄下的文件,不然建立dirId/subDirId目錄。

  好的下來咱們來建立本地臨時文件與shuffle過程的臨時文件:

  

   咱們能夠看到,當MemoryStore沒有足夠空間時,就會使用DiskStore將塊存入磁盤。當ShuffleMapTask運行結束須要把中間結果臨時保存,此時就調用了createTempShuffleBlock方法建立臨時Block,並返回TempShuffleBlockId與其文件的對偶,同時拼上隨機字符串標識。

         那麼下來,咱們再深刻了解下MemoryStore,咱們在配置spark的時候,會配置計算內存與緩存內存的比例,實質是經過MemoryStore將沒有序列化的Java對象數組或者序列化的ByteBuffer存儲到內存中,那麼MemoryStore是如何構造的呢?

    

   整個MemoryStore的存儲分爲兩塊:一塊是被不少MemeoryEntry佔據的內存currentMemory,這些currentMemory其實是經過entryes持有的;另外一塊兒是經過unrollMemoryMap經過佔座方式佔用的內存currentUnrollMemory.其實意思就是預留空間,能夠防止在向內存真正寫入數據時,內存不足發生溢出。查閱數據,記錄些概念:

  -maxUnrollMemory:當前Driver或者Executor最多展開的Block所佔用的內存,能夠修改spark.storage.unrollFraction的大小。

  -maxMemory:當前Driver或者Executor的最大內存。

  -currentMemory:當前Driver或者Executor已經使用的內存。

  -freeMemory:當前Driver或Executor未使用內存。freeMemoy = maxMemory - currentMemory

  

  這裏有個重要的點,叫作unrollSafely,爲了防止寫入內存的數據過大,致使內存溢出,Spark採用了一種優化方案,在正式寫入內存以前,先用邏輯方式申請內存,若是申請成功,再寫入內存,這個過程就跟名字同樣了,稱爲安全展開

  就到這裏好了,去吃飯~

  

參考文獻:《深刻理解Spark:核心思想與源碼分析》

相關文章
相關標籤/搜索