spark 源碼分析之十七 -- Spark磁盤存儲剖析

上篇文章 spark 源碼分析之十六 -- Spark內存存儲剖析 主要剖析了Spark 的內存存儲。本篇文章主要剖析磁盤存儲。html

總述

磁盤存儲相對比較簡單,相關的類關係圖以下:java

 

 

咱們先從依賴類 DiskBlockManager 剖析。node

 

DiskBlockManager

文檔說明以下:數組

Creates and maintains the logical mapping between logical blocks and physical on-disk locations. 
One block is mapped to one file with a name given by its BlockId.
Block files are hashed among the directories listed in spark.local.dir (or in SPARK_LOCAL_DIRS, if it's set).

建立並維護邏輯block和block落地的物理文件的映射關係。一個邏輯block經過它的BlockId的name屬性映射到具體的文件。多線程

 

類結構

其類結構以下:app

能夠看出,這個類主要用於建立並維護邏輯block和block落地文件的映射關係。保存映射關係,有兩個解決方案:一者是使用Map存儲每一條具體的映射鍵值對,兩者是指定映射函數像分區函數等等,給定的key經過映射函數映射到具體的value。dom

成員變量

成員變量以下:異步

subDirsPerLocalDir:這個變量表示本地文件下有幾個文件,默認爲64,根據參數 spark.diskStore.subDirectories 來調節。ide

subDirs:是一個二維數組表示本地目錄和子目錄名稱的組合關係,即 ${本地目錄1 ... 本地目錄n}/${子目錄1 ... 子目錄64}函數

localDirs:表示block落地本地文件根目錄,經過 createLocalDirs 方法獲取,方法以下:

思路:它先調用調用Utils的 getConfiguredLocalDirs 方法,獲取到配置的目錄集合,而後map每個父目錄,調用Utils的createDirectory方法,在每個子目錄下建立一個 以blockmgr 爲前綴的目錄。其依賴方法 createDirectory 以下:

這個方法容許重試次數爲10,目的是爲了防止建立的目錄跟已存在的目錄重名。

 

getConfiguredLocalDirs 方法以下:

大多數生產狀況下,都是使用yarn,咱們直接看一下spark on yarn 環境下,目錄到底在哪裏。直接來看getYarnLocalDirs方法:

LOCAL_DIRS的定義是什麼?

任務是跑在yarn 上的,下面就去定位一下hadoop yarn container的相關源碼。

定位LOCAL_DIRS環境變量

在ContainerLaunch類的 sanitizeEnv 方法中,找到了以下語句:

 

addToMap 方法以下:

即,數據被添加到了envirment map變量和 nmVars set集合中了。

在ContainerLaunch 的 call 方法中調用了 sanitizeEnv 方法:

appDirs變量定義以下:

即每個 appDir格式以下:${localDir}/usercache/${user}/appcache/${application-id}/

localDirs 定義以下:

dirHandler是一個 LocalDirsHandlerService 類型變量,這是一個服務,在其serviceInit方法中,實例化了 MonitoringTimerTask對象:

在 MonitoringTimerTask 構造方法中,發現了:

 NM_LOCAL_DIRS 常量定義以下:

 

即:yarn.nodemanager.local-dirs 參數,該參數定義在yarn-default.xml下。

即localDir以下:

${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/${application-id}/

再結合createDirectory方法,磁盤存儲的本地目錄是:

 ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/${application-id}/blockmgr-隨機的uuid/

核心方法

根據文件內容建立File對象,以下:

思路:先根據filename即blockId的name字段生成正的hashcode(abs(hashcode))

dirId 是指的第幾個父目錄(從0開始數),subDirId是指的父目錄下的第幾個子目錄(從0開始數)。最後拼接父子目錄爲一個新的父目錄subDir。

而後以subDir爲父目錄,建立File對象,並返回之。

跟getFile 方法相關的方法以下:

比較簡單,不作過多說明。

 

建立一個臨時Block,包括臨時本地block 或 shuffle block,以下:

 

還有一個方法,是中止 DiskBlockManager以後的回調方法:

若deleteFilesOnStop 爲 true,即DiskBlockManager中止時,是否須要清除本地存儲的block文件。

在 BlockManager 中初始化DiskBlockManager時,deleteFilesOnStop 經過構造方法傳入

 

總結:DiskBlockManager 是用來建立並維護邏輯block和落地後的block文件的映射關係的,它還負責建立用於shuffle或本地的臨時文件。

 下面看一下在DiskStore中可能會用到的類以及其相關類的說明。

CountingWritableChannel 

它主要對sink作了包裝,在寫入sink的同時,還記錄向sink寫的數據的總量。源碼以下:

代碼比較簡單,不作過多說明。

ManagedBuffer

類說明以下:

This interface provides an immutable view for data in the form of bytes. 
The implementation should specify how the data is provided:
- FileSegmentManagedBuffer: data backed by part of a file
- NioManagedBuffer: data backed by a NIO ByteBuffer
- NettyManagedBuffer: data backed by a Netty ByteBuf
The concrete buffer implementation might be managed outside the JVM garbage collector.
For example, in the case of NettyManagedBuffer, the buffers are reference counted.
In that case, if the buffer is going to be passed around to a different thread, retain/release should be called.

 

類結構以下:

 

EncryptedManagedBuffer

它是一個適配器,它將幾乎因此轉換的請求委託給了 blockData,下面來看一下這個類相關的剖析。

首先先看一下它的父類 -- BlockData

 

BlockData

接口說明以下:

它是一個接口,它定義了存儲方式以及如何提供不一樣的方式來讀去底層的block 數據。

定義方法以下:

方法說明以下:

toInputStream用於返回用於讀取該文件的輸入流。

toNetty用於返回netty對block數據的包裝類,方便netty包來讀取數據。

toChunkedByteBuffer用於將block包裝成ChunkedByteBuffer。

toByteBuffer 用於將block數據轉換爲內存中直接讀取的 ByteBuffer 對象。

當對該block的操做執行完畢後,須要調用dispose來作後續的收尾工做。

size表示block文件的大小。

它有三個子類:DiskBlockData、EncryptedBlockData和ByteBufferBlockData。

即block的三種存在形式:磁盤、加密後的block和內存中的ByteBuffer

分別介紹以下:

 

DiskBlockData

該類主要用於將磁盤中的block文件轉換爲指定的流或對象。

先來看其簡單的方法實現:

構造方法:

相關字段說明以下:

minMemoryMapBytes表示 磁盤block映射到內存塊中最小大小,默認爲2MB,能夠經過 spark.storage.memoryMapThreshold 進行調整。

maxMemoryMapBytes表示 磁盤block映射到內存塊中最大大小,默認爲(Integer.MAX_VALUE - 15)B,能夠經過 spark.storage.memoryMapLimitForTests 進行調整。

對應源碼以下:

比較簡單的方法以下: 

size方法直接返回block文件的大小。

dispose空實現。

open是一個私有方法,主要用於獲取讀取該block文件的FileChannel對象。

 

toByteBuffer方法實現以下:

 

Utils的tryWithResource方法以下,它先執行createResource方法,而後執行Function對象的apply方法,最終釋放資源,思路就是 建立資源 --使用資源-- 釋放資源三步曲:

即先獲取讀取block文件的FileChannel對象,若blockSize 小於 最小的內存映射字節大小,則將channel的數據讀取到buffer中,返回的是HeapByteBuffer對象,即數據被寫入到了堆裏,即它是non-direct buffer,至關於數據被讀取到中間臨時內存中,不然使用FileChannelImpl的map方法返回 MappedByteBuffer 對象。

MappedByteBuffer文檔說明以下:

A direct byte buffer whose content is a memory-mapped region of a file.
Mapped byte buffers are created via the FileChannel.map method. This class extends the ByteBuffer class with operations that are specific to memory-mapped file regions.
A mapped byte buffer and the file mapping that it represents remain valid until the buffer itself is garbage-collected.
The content of a mapped byte buffer can change at any time, for example if the content of the corresponding region of the mapped file is changed by this program or another. Whether or not such changes occur, and when they occur, is operating-system dependent and therefore unspecified. 
All or part of a mapped byte buffer may become inaccessible at any time, for example if the mapped file is truncated. An attempt to access an inaccessible region of a mapped byte buffer will not change the buffer's content and will cause an unspecified exception to be thrown either at the time of the access or at some later time. It is therefore strongly recommended that appropriate precautions be taken to avoid the manipulation of a mapped file by this program, or by a concurrently running program, except to read or write the file's content.
Mapped byte buffers otherwise behave no differently than ordinary direct byte buffers.

 

它是direct buffer,即直接從磁盤讀數據,不通過中間臨時內存,能夠參照ByteBuffer的文檔對Direct vs. non-direct buffers 的說明以下:

Direct vs. non-direct buffers
A byte buffer is either direct or non-direct. Given a direct byte buffer, the Java virtual machine will make a best effort to perform native I/O operations directly upon it. That is, it will attempt to avoid copying the buffer's content to (or from) an intermediate buffer before (or after) each invocation of one of the underlying operating system's native I/O operations.
A direct byte buffer may be created by invoking the allocateDirect factory method of this class. The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers. The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.
A direct byte buffer may also be created by mapping a region of a file directly into memory. An implementation of the Java platform may optionally support the creation of direct byte buffers from native code via JNI. If an instance of one of these kinds of buffers refers to an inaccessible region of memory then an attempt to access that region will not change the buffer's content and will cause an unspecified exception to be thrown either at the time of the access or at some later time.
Whether a byte buffer is direct or non-direct may be determined by invoking its isDirect method. This method is provided so that explicit buffer management can be done in performance-critical code. 

 

toChunkedByteBuffer 方法以下:

首先,ChunkedByteBuffer對象裏包含的是數據分紅多個小的chunk,而不是連續的數組。

先把文件讀到內存中的 HeapByteBuffer 對象中即單個chunk,而後放入存放chunk的ListBuffer中,最終轉換爲Array存入到ChunkedByteBuffer 對象中。

toNetty實現以下:

DefaultFileRegion說明請繼續向下看,先不作過多說明。

 

EncryptedBlockData

這個類主要是用於加密的block磁盤文件轉換爲特定的流或對象。

構造方法以下:

file指block文件,blockSize指block文件大小,key是用於加密的密鑰。

先來看三個比較簡單的方法:

 

open方法再也不直接根據FileInputStream獲取其 FileChannelImpl 對象了,而是獲取 FileChannelImpl 以後,再調用了 CryptoStreamUtils 的 createReadableChannel 方法,以下:

進一步將channel 對象封裝爲 CryptoInputStream 對象,對ErrorHandlingReadableChannel的讀操做,其實是讀的 CryptoInputStream,這個流內部有一個根據key來初始化的加密器,這個加密器負責對數據的解密操做。

 

toByteBuffer方法以下:

思路:若是block數據大小在整數範圍內,則直接將加密的block解密以後存放在內存中。

toChunkedByteBuffer方法除了解密操做外,跟DiskBlockData 中toChunkedByteBuffer方法無異,不作過多說明,代碼以下:

toNetty 方法,源碼以下:

ReadableChannelFileRegion類在下文介紹,先不作過多說明。

 

toInputStream方法,源碼以下:

思路:這個就不能直接open方法返回的獲取inputStream,由於 CryptoInputStream 是沒有獲取inputStream的接口的,Channels.newInputStream返回的是ChannelInputStream,ChannelInputStream對channel作了裝飾。

ByteBufferBlockData

總體比較簡單,主要來看一下dispose方法,ChunkedByteBuffer 方法的 dispose 以下:

即便用StorageUtils的dispose 方法去清理每個chunk,StorageUtils的dispose 方法以下:

即獲取它的cleaner,而後調用cleaner的clean方法。咱們以 DirectByteBufferR 爲例,作進一步說明:

在其構造方法中初始化Cleaner,以下:

base是調用unsafe類的靜態方法allocateMemory分配指定大小內存後返回的內存地址,size是內存大小。

類聲明:

沒錯它是一個虛引用,隨時會被垃圾回收。

 

Cleaner的構造方法以下:

var1 是待清理的對象,var2 是執行清理任務的Runnable對象。

再看它的成員變量:

沒錯,它本身自己就是雙向鏈表上的一個節點,也是雙向鏈表。

 其create 方法以下:

思路:建立cleanr並把它加入到雙向鏈表中。

 

Cleaner的 clean方法以下:

它會先調用remove 方法,調用成功則執行內存清理任務,注意這裏沒有異步任務同步調用Runnable的run方法。

remove 方法以下:

思路:從雙向鏈表中移除指定的cleaner。

Deallocator 類以下:

unsafe的allocateMemory方法使用了off-heap memory,這種方式的內存分配不是在堆裏,不受GC的管理,使用Unsafe.freeMemory()來釋放它。

先調用 unsafe釋放內存,而後調用Bits的 unreserveMemory 方法:

至此,dispose 方法結束。

 

 

下面看一下,ReadableChannelFileRegion的繼承關係:

咱們按繼承關係來看類: ReferenceCounted --> FileRegion --> AbstractReferenceCounted --> AbstractFileRegion --> ReadableChannelFileRegion。

ReferenceCounted

類說明以下:

A reference-counted object that requires explicit deallocation.
When a new ReferenceCounted is instantiated, it starts with the reference count of 1. 
retain() increases the reference count, and release() decreases the reference count.
If the reference count is decreased to 0, the object will be deallocated explicitly,
and accessing the deallocated object will usually result in an access violation. If an object that implements ReferenceCounted is a container of other objects that implement ReferenceCounted,
the contained objects will also be released via release() when the container's reference count becomes 0.

這是netty包下的一個接口。

它是一個引用計數對象,須要顯示調用deallocation。

ReferenceCounted對象實例化時,引用計數設爲1,調用retain方法增長引用計數,release方法則釋放引用計數。

若是引用計數減小至0,對象會被顯示deallocation,訪問已經deallocation的對象會形成訪問問題。

若是一個對象實現了ReferenceCounted接口的容器包含了其餘實現了ReferenceCounted接口的對象,當容器的引用減小爲0時,被包含的對象也須要經過 release 方法釋放之,即引用減1。

主要有三類核心方法:

retain:Increases the reference count by 1 or the specified increment.

touch:Records the current access location of this object for debugging purposes. If this object is determined to be leaked, the information recorded by this operation will be provided to you via ResourceLeakDetector. This method is a shortcut to touch(null).

release:Decreases the reference count by 1 and deallocates this object if the reference count reaches at 0. Returns true if and only if the reference count became 0 and this object has been deallocated

refCnt:Returns the reference count of this object. If 0, it means this object has been deallocated.

FileRegion

它也是netty下的一個包,FileRegion數據經過支持零拷貝的channel將數據傳輸到目標channel。

A region of a file that is sent via a Channel which supports zero-copy file transfer .

 

注意:文件零拷貝傳輸對JDK版本和操做系統是有要求的:

FileChannel.transferTo(long, long, WritableByteChannel) has at least four known bugs in the old versions of Sun JDK and perhaps its derived ones. Please upgrade your JDK to 1.6.0_18 or later version if you are going to use zero-copy file transfer.
If your operating system (or JDK / JRE) does not support zero-copy file transfer, sending a file with FileRegion might fail or yield worse performance. For example, sending a large file doesn't work well in Windows.
Not all transports support it

 

接口結構以下:

下面對新增方法的解釋:

count:Returns the number of bytes to transfer.

position:Returns the offset in the file where the transfer began.

transferred:Returns the bytes which was transfered already.

transferTo:Transfers the content of this file region to the specified channel.

AbstractReferenceCounted

這個類是經過一個變量來記錄引用的增長或減小狀況。

類結構以下:

先來當作員變量:

refCnt就是內部記錄引用數的一個volatile類型的變量,refCntUpdater是一個 AtomicIntegerFieldUpdater 類型常量,AtomicIntegerFieldUpdater 基於反射原子性更新某個類的 volatile 類型成員變量。

A reflection-based utility that enables atomic updates to designated volatile int fields of designated classes. 
This class is designed for use in atomic data structures in which several fields of the same node are independently subject to atomic updates. Note that the guarantees of the compareAndSet method in this class are weaker than in other atomic classes.
Because this class cannot ensure that all uses of the field are appropriate for purposes of atomic access,
it can guarantee atomicity only with respect to other invocations of compareAndSet and set on the same updater.

 

方法以下:

1. 設置或獲取 refCnt 變量

2. 增長引用:

3. 減小引用:

 

AbstractFileRegion

AbstractFileRegion 繼承了AbstractReferenceCounted, 但他仍是一個抽象類,只是實現了部分的功能,以下:

DefaultFileRegion

 文檔說明以下:

Default FileRegion implementation which transfer data from a FileChannel or File. 
Be aware that the FileChannel will be automatically closed once refCnt() returns 0.

 

先來看一下它主要的成員變量:

f:是指要傳輸的源文件。

file:是指要傳輸的源FileChannel

position:傳輸開始的字節位置

count:總共須要傳輸的字節數量

transferred:指已經傳輸的字節數量

 

關鍵方法 transferTo 的源碼以下:

思路:先計算出剩餘須要傳輸的字節的總大小。而後從 position 的相對位置開始傳輸到指定的target sink。

注意:position是指相對於position最初開始位置的大小,絕對位置爲 this.position + position。

其中,open 方法以下,它返回一個隨機讀取文件的 FileChannel 對象。

其deallocate 方法以下:

思路:直接關閉,取消成員變量對於FileChannel的引用,便於垃圾回收時能夠回收FileChannel,而後關閉FileChannel便可。

 

總結:它經過 RandomeAccessFile 獲取 能夠支持隨機訪問 FileChannelImpl 的FileChannel,而後根據相對位置計算出絕對位置以及須要傳輸的字節總大小,最後將數據傳輸到target。

其引用計數的處理調用其父類 AbstractReferenceCounted的對應方法。

ReadableChannelFileRegion

其源碼以下:

其內部的buffer 的大小時 64KB,_traferred 變量記錄了已經傳輸的字節數量。ReadableByteChannel 是按順序讀的,因此pos參數沒有用。

 

下面,重點對DiskStore作一下剖析。 

DiskStore

它就是用來保存block 到磁盤的。

 構造方法以下:

它有三個成員變量:

blockSizes 記錄了每個block 的blockId 和其大小的關係。能夠經過get 方法獲取指定blockId 的block大小。以下:

 

putBytes方法以下:

putBytes將數據寫入到磁盤中;getBytes獲取的是BlockData數據,注意如今只是返回文件的引用,文件的內容並無返回,使得上文所講的多種多樣的BlockData轉換操做直接對接FileChannel,即本地文件,能夠充分發揮零拷貝等特性,數據傳輸效率會更高。

其中put 方法以下:

思路很簡單,先根據diskManager獲取到block在磁盤中的文件的抽象 -- File對象,而後獲取到filechannel,調用回調函數將數據寫入到本地block文件中,最後記錄block和其block大小,最後關閉out channel。若是中途拋出異常,則格式化已寫入的數據,確保數據的寫入是原子化操做(要麼全成功,要麼全失敗)。

put方法依賴的方法以下:

openForWrite方法,先獲取filechannel,而後若是數據有加密,在建立加密的channel用來處理加密的數據

總結:本篇文章介紹了維護blockId和block物理文件的映射關係的DiskBlockManager;Hadoop yarn定位LOCAL_DIRS環境變量是如何定義的;定義了block的存儲方式以及轉換成流或channel或其餘對象的BlockData接口以及它的三個具體的實現,順便介紹了directByteBuffer內存清理機制--Cleaner以及相關類的解釋;用做數據傳輸的DefaultFileRegion和ReadableChannelFileRegion類以及其相關類;最後介紹了磁盤存儲裏的重頭戲--DiskStore,並重點介紹了其用於存儲數據和刪除數據的方法。

不足之處:本篇文章對磁盤IO中的nio以及netty中的相關類介紹的不是很詳細,能夠閱讀相關文檔作進一步理解。畢竟如何高效地和磁盤打交道也是比較重要的技能。後面有機會可能會對java的集合io多線程jdk部分的源碼作一次完全剖析,但那是後話了。目前打算先把spark中認爲本身比較重要的梳理一遍。

相關文章
相關標籤/搜索