上篇spark 源碼分析之十五 -- Spark內存管理剖析 講解了Spark的內存管理機制,主要是MemoryManager的內容。跟Spark的內存管理機制最密切相關的就是內存存儲,本篇文章主要介紹Spark內存存儲。html
跟內存存儲的相關類的關係以下:java
MemoryStore是負責內存存儲的類,其依賴於BlockManager、SerializerManager、BlockInfoManager、MemoryManager。數組
BlockManager是BlockEvictionHandler的實現類,負責實現dropFromMemory方法,必要時從內存中把block丟掉,可能會轉儲到磁盤上。緩存
SerializerManager是負責持久化的一個類,能夠參考文章spark 源碼分析之十三 -- SerializerManager剖析作深刻了解。app
BlockInfoManager是一個實現了對block讀寫時的一個鎖機制,具體能夠看下文。ide
MemoryManager 是一個內存管理器,從Spark 1.6 之後,其存儲內存池大小和執行內存池大小是能夠動態擴展的。即存儲內存和執行內存必要時能夠從對方內存池借用空閒內存來知足本身的使用需求。能夠參考文章 spark 源碼分析之十五 -- Spark內存管理剖析 作深刻了解。源碼分析
BlockInfo 保存了跟block相關的信息。post
BlockId的name不一樣的類型有不一樣的格式,表明不一樣的block類型。學習
StorageLevel 表示block的存儲級別,它自己是支持序列化的。ui
當存儲一個集合爲序列化字節數組時,失敗的結果由 PartiallySerializedBlock 返回。
當存儲一個集合爲Java對象數組時,失敗的結果由 PartiallyUnrolledIterator 返回。
RedirectableOutputStream 是對另外一個outputstream的包裝outputstream,負責直接將數據中轉到另外一個outputstream中。
ValueHolder是一個內存中轉站,其有一個getBuilder方法能夠獲取到MemoryEntryBuilder對象,該對象會負責將中轉站的數據轉換爲對應的能夠保存到MemStore中的MemoryEntry。
咱們逐個來分析其源碼:
它記錄了block 的相關信息。
level: StorageLevel 類型,表明block的存儲級別
classTag:block的對應類,用於選擇序列化類
tellMaster:block 的變化是否告知master。大部分狀況下都是須要告知的,除了廣播的block。
size: block的大小(in byte)
readerCount:block 讀的次數
writerTask:當前持有該block寫鎖的taskAttemptId,其中 BlockInfo.NON_TASK_WRITER 表示非 task任務 持有鎖,好比driver線程,BlockInfo.NO_WRITER 表示沒有任何代碼持有寫鎖。
A Block can be uniquely identified by its filename, but each type of Block has a different set of keys which produce its unique name. If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
其子類,在上圖中已經標明。
文檔介紹以下:
Component of the BlockManager which tracks metadata for blocks and manages block locking. The locking interface exposed by this class is readers-writer lock. Every lock acquisition is automatically associated with a running task and locks are automatically released upon task completion or failure. This class is thread-safe.
它有三個成員變量,以下:
infos 保存了 Block-id 和 block信息的對應關係。
writeLocksByTask 保存了每個任務和任務持有寫鎖的block-id
readLockByTasks 保存了每個任務和任務持有讀鎖的block-id,由於讀鎖是可重入的,因此 ConcurrentHashMultiset 是支持多個重複值的。
方法以下:
1. 註冊task
2. 獲取當前task
3. 獲取讀鎖
思路:若是block存在,而且沒有task在寫,則直接讀便可,不然進入鎖等待區等待。
4. 獲取寫鎖
思路:若是block存在,且沒有task在讀,也沒有task在寫,則在寫鎖map上記錄task,表示已獲取寫鎖,不然進入等待區等待
5. 斷言有task持有寫鎖寫block
6. 寫鎖降級
思路:首先把和block綁定的task取出並和當前task比較,如果同一個task,則調用unlock方法
7. 釋放鎖:
思路:若當前任務持有寫鎖,則直接釋放,不然讀取次數減1,而且從讀鎖記錄中刪除一條讀鎖記錄。最後喚醒在鎖等待區等待的task。
8. 獲取爲寫一個新的block獲取寫鎖
9. 釋放掉指定task的全部鎖
思路:先獲取該task的讀寫鎖記錄,而後移除寫鎖記錄集中的每一條記錄,移除讀鎖記錄集中的每一條讀鎖記錄。
10. 移除並釋放寫鎖
讀寫鎖記錄清零,解除block-id和block信息的綁定。
還有一些查詢方法,再也不作詳細說明。
簡單總結一下:
讀鎖支持可重入,便可以重複獲取讀鎖。能夠獲取讀鎖的條件是:沒有task在寫該block,對有沒有task在讀block沒有要求。
寫鎖當且僅當一個task獲取,能夠獲取寫鎖的條件是:沒有task在讀block,沒有task在寫block。
注意,這種設計能夠用在一個block的讀的次數遠大於寫的次數的狀況下。咱們能夠來作個假設:假設一個block寫的次數遠超過讀的次數,同時多個task寫同一個block的操做就變成了串行的,寫的效率,由於只有一個BlockInfoManager對象,即一個鎖,即全部在鎖等待區等待的writer們都在競爭一個鎖。對於讀的次數遠超過寫的次數狀況下,reader們能夠肆無忌憚地讀取數據數據,基本處於無鎖狀況下,幾乎沒有了鎖切換帶來的開銷,而且能夠容許不一樣task同時讀取同一個block的數據,讀的吞吐量也提升了。
總之,BlockInfoManager本身實現了block的一套讀寫鎖機制,這種讀寫鎖的設計思路是很是經典和值得學習的。
文檔說明:
A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
即這個類能夠將outputstream重定向到另外一個outputstream。
源碼也很簡單:
os成員變量就是重定向的目標outputstream
memoryEntry本質上就是內存中一個block,指向了存儲在內存中的真實數據。
如上圖,它有兩個子類:
其中,DeserializedMemoryEntry 是用來保存反序列化以後的java對象數組的,value是一個數據,保存着真實的反序列化數據,size表示,classTag記錄着數組中被擦除的數據的Class類型,這種數據只能保存在堆內內存中。
SerializedMemoryEntry 是用來保存序列化以後的ByteBuffer數組的,buffer中記錄的是真實的Array[ByteBuffer]數據。memoryMode表示數據存儲的內存區域,堆外內存仍是堆內內存,classTag記錄着序列化前被擦除的數據的Class類型,size表示字節數據大小。
build方法將內存數據構建到MemoryEntry中
本質上來講,就是一個內存中轉站。數據被臨時寫入到這個中轉站,而後調用其getBuilder方法獲取 MemoryEntryBuilder 對象,這個對象用於構建MemoryEntry 對象。
storeValues用於寫入數據,estimateSize用於評估holder中內存的大小。調用getBuilder以後會返回 MemoryEntryBuilder對象,後續能夠拿這個builder建立MemoryEntry
調用getBuilder以後,會關閉流,禁止數據寫入。
它有兩個子類:用於中轉Java對象的DeserializedValuesHolder和用於中轉字節數據的SerializedValuesHolder。
其實現類具體以下:
接下來,咱們看一下Spark內存存儲中的重頭戲 -- MemoryStore
文檔說明:
Stores blocks in memory, either as Arrays of deserialized Java objects or as serialized ByteBuffers.
類內部結構以下:
對成員變量的說明:
entries 本質上就是在內存中保存blockId和block內容的一個map,它的 accessOrder爲true,即最近訪問的會被移動到鏈表尾部。
onHeapUnrollMemoryMap 記錄了taskAttemptId和須要攤開一個block須要的堆內內存大小的關係
offHeapUnrollMemoryMap 記錄了taskAttemptId和須要攤開一個block須要的堆外內存大小的關係
unrollMemoryThreshold 表示在攤開一個block 以前給request分配的初始內存,能夠經過 spark.storage.unrollMemoryThreshold 來調整,默認是 1MB
下面,開門見山,直接剖析比較重要的方法:
1. putBytes:這個方法只被BlockManager調用,其中_bytes回調用於生成直接被緩存的ChunkedByteBuffer:
思路:先從MemoryManager中申請內存,若是申請成功,則調用回調方法 _bytes 獲取ChunkedByteBuffer數據,而後封裝成 SerializedMemoryEntry對象 ,最後將封裝好的SerializedMemoryEntry對象緩存到 entries中。
2. 把迭代器中值保存爲內存中的Java對象
思路:轉換爲DeserializedValueHolder對象,進而調用putIterator方法,ValueHolder就是一個抽象,使得putIterator既能夠緩存序列化的字節數據又能夠緩存Java對象數組。
3. 把迭代器中值保存爲內存中的序列化字節數據
思路:轉換爲 SerializedValueHolder 對象,進而調用putIterator方法。
MAX_ROUND_ARRARY_LENGTH和unrollMemoryThreshold的定義以下:
1 public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
2 private val unrollMemoryThreshold: Long = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
unrollMemoryThreshold 默認是 1MB,能夠經過 spark.storage.unrollMemoryThreshold 參數調整大小。
4. putIterator方法由參數ValueHolder,使得緩存字節數據和Java對象能夠放到一個方法來。 方法2跟3 都調用了 putIterator 方法,以下:
思路:
第一步:定義攤開內存初始化大小,攤開內存增加率,攤開內存檢查頻率等變量。
第二步:向MemoryManager請求申請攤開初始內存,若成功,則記錄這筆攤開內存。
第三步:而後進入223~240行的while循環,在這個循環裏:
第四步:
若上一次向MemoryManager申請內存成功,則從ValueHolder中獲取builder,而且計算準確內存開銷。查看準確內存是否大於了已分配內存,若大於,則請求MemoryManager分配內存,並將分配的內存累加到已分配內存中。
不然,不然打印內存使用狀況,返回爲攤開該block申請的內存
第五步:
若上一次向MemoryManager申請內存成功,首先調用MemoryEntryBuilder的build方法構建出能夠直接存入內存的MemoryEntry,並向MemoryManager請求釋放攤開內存,申請存儲內存,並確保存儲內存申請成功。最後將數據存入內存的entries中。
不然打印內存使用狀況,返回爲攤開該block申請的內存
其實以前不是很理解unroll這個詞在這裏的含義,一直譯做攤開,它其實指的就是集合的數據轉儲到中轉站這個操做,攤開內存指這個操做須要的內存。
下面來看一下這個方法裏面依賴的常量和方法:
4. 1 unrollMemoryThreshold 在上一個方法已作說明。UNROLL_MEMORY_CHECK_PERIOD 和 UNROLL_MEMORY_GROWTH_FACTOR 常量定義以下:
即,UNROLL_MEMORY_CHECK_PERIOD默認是16,UNROLL_MEMORY_GROWTH_FACTOR 默認是 1.5
4.2 reserveUnrollMemoryForThisTask方法源碼以下,思路大體上是先從MemoryManager 申請攤開內存,若成功,則根據memoryMode在堆內或堆外記錄攤開內存的map上記錄新分配的內存。
4.3 releaseUnrollMemoryForThisTask方法以下,實現思路:先根據memoryMode獲取到對應記錄堆內或堆外內存的使用狀況的map,而後在該task的攤開內存上減去這筆內存開銷,若是減完以後,task使用內存爲0,則直接從map中移除對該task的內存記錄。
4.4 日誌打印block攤開內存和當前內存使用狀況
5. 獲取緩存的值:
思路:直接根據blockId從entries中取出MemoryEntry數據,而後根據MemoryEntry類型取出數據便可。
6. 移除Block或清除緩存,比較簡單,不作過多說明:
7. 嘗試驅逐block來釋放指定大小的內存空間來存儲給定的block,代碼以下:
該方法有三個參數:要分配內存的blockId,block大小,內存類型(堆內仍是堆外)。
第 469~485 行:dropBlock 方法思路: 先從MemoryEntry中獲取data,再調用 BlockManager從內存中驅逐出該block,若是該block 的StorageLevel容許落地到磁盤,則先落到磁盤,再從內存中刪除之,最後更新該block的StorageLevel,最後檢查新的StorageLevel,若該block還在內存或磁盤中,則釋放鎖,不然,直接從BlockInfoManager中刪除之。
第 443 行: 找到block對應的rdd。
第451~467 行:先給entries上鎖,而後遍歷entries集合,檢查block 是否能夠從內存中驅逐,若能夠則把它加入到selectedBlocks集合中,並把該block大小累加到freedMemory中。
461行的 lockForWriting 方法,不堵塞,即若是第一次拿不到寫鎖,則一直不停地輪詢,直到能夠拿到寫鎖爲止。那麼問題來了,爲何要先獲取寫鎖呢?由於寫鎖具備排他性而且不具有可重入性,一旦拿到寫鎖,其餘鎖就不能再訪問該block了。
487行~ 528 行:若計劃要釋放的內存小於存儲新block須要的內存大小,則直接釋放寫鎖,不從內存中驅逐以前選擇的block,直接返回。
若計劃要釋放的內存不小於存儲新block須要的內存大小,則遍歷以前選擇的每個block,獲取entry,並調用dropMemory方法,返回釋放的內存大小。finally 代碼塊是防止在dropMemory過程當中,該線程被中斷,其他block寫鎖不能被釋放的狀況。
其依賴的方法以下:
存儲內存失敗以後,會返回 PartiallySerializedBlock 或者 PartiallyUnrolledIterator。
PartiallyUnrolledIterator 是一個Iterator,能夠用來遍歷block數據,同時負責釋放攤開內存。
PartiallySerializedBlock 它能夠將失敗的block轉化成 PartiallyUnrolledIterator 用來遍歷,能夠直接丟棄失敗的block,也能夠把數據轉儲到給定的能夠落地的outputstream中,同時釋放攤開內存。
總結:
本篇文章主要講解了Spark的內存存儲相關的內容,重點講解了BlockInfoManager實現的鎖機制、跟ValuesHolder中轉站相關的MemoryEntry、EmmoryEntryBuilder等相關內容以及內存存儲中的重頭戲 -- MemStore相關的Block存儲、Block釋放、爲新Block驅逐內存等等功能。