不少初學者其實對Spark的編程模式仍是RDD這個概念理解不到位,就會產生一些誤解。編程
好比,不少時候咱們經常覺得一個文件是會被完整讀入到內存,而後作各類變換,這極可能是受兩個概念的誤導:分佈式
若是你沒有主動對RDDCache/Persist,它不過是一個概念上存在的虛擬數據集,你其實是看不到這個RDD的數據的全集的(他不會真的都放到內存裏)。函數
一個RDD 本質上是一個函數,而RDD的變換不過是函數的嵌套。RDD我認爲有兩類:oop
咱們如下面的代碼爲例作分析:spa
sc.textFile("abc.log").map().saveAsTextFile("")
因此RDD不過是對一個函數的封裝,當一個函數對數據處理完成後,咱們就獲得一個RDD的數據集(是一個虛擬的,後續會解釋)。code
NewHadoopRDD是數據來源,每一個parition負責獲取數據,得到過程是經過iterator.next 得到一條一條記錄的。假設某個時刻拿到了一條數據A,這個A會馬上被map裏的函數處理獲得B(完成了轉換),而後開始寫入到HDFS上。其餘數據重複如此。因此整個過程:orm
因此整個過程實際上是流式的過程,一條數據被各個RDD所包裹的函數處理。內存
剛纔我反覆提到了嵌套函數,怎麼知道它是嵌套的呢?it
若是你寫了這樣一個代碼:io
sc.textFile("abc.log").map().map().........map().saveAsTextFile("")
有成千上萬個map,極可能就堆棧溢出了。爲啥?其實是函數嵌套太深了。
按上面的邏輯,內存使用實際上是很是小的,10G內存跑100T數據也不是難事。可是爲何Spark經常由於內存問題掛掉呢? 咱們接着往下看。
這就是爲何要分Stage了。每一個Stage其實就是我上面說的那樣,一套數據被N個嵌套的函數處理(也就是你的transform動做)。遇到了Shuffle,就被切開來,所謂的Shuffle,本質上是把數據按規則臨時都落到磁盤上,至關於完成了一個saveAsTextFile的動做,不過是存本地磁盤。而後被切開的下一個Stage則以本地磁盤的這些數據做爲數據源,從新走上面描述的流程。
咱們再作一次描述:
所謂Shuffle不過是把處理流程切分,給切分的上一段(咱們稱爲Stage M)加個存儲到磁盤的Action動做,把切分的下一段(Stage M+1)數據源變成Stage M存儲的磁盤文件。每一個Stage均可以走我上面的描述,讓每條數據均可以被N個嵌套的函數處理,最後經過用戶指定的動做進行存儲。
前面咱們提到,Shuffle不過是偷偷的幫你加上了個相似saveAsLocalDiskFile
的動做。然而,寫磁盤是一個高昂的動做。因此咱們儘量的把數據先放到內存,再批量寫到文件裏,還有讀磁盤文件也是給費內存的動做。把數據放內存,就遇到個問題,好比10000條數據,到底會佔用多少內存?這個其實很難預估的。因此一不當心,就容易致使內存溢出了。這其實也是一個很無奈的事情。
其實就是給某個Stage加上了一個saveAsMemoryBlockFile
的動做,而後下次再要數據的時候,就不用算了。這些存在內存的數據就表示了某個RDD處理後的結果。這個纔是說爲啥Spark是內存計算引擎的地方。在MR裏,你是要放到HDFS裏的,但Spark容許你把中間結果放內存裏。
咱們從一個較新的角度解釋了RDD 和Shuffle 都是一個什麼樣的東西。