Spark會把數據都載入到內存麼?

前言

不少初學者其實對Spark的編程模式仍是RDD這個概念理解不到位,就會產生一些誤解。編程

好比,不少時候咱們經常覺得一個文件是會被完整讀入到內存,而後作各類變換,這極可能是受兩個概念的誤導:分佈式

  1. RDD的定義,RDD是一個分佈式的不可變數據集合
  2. Spark 是一個內存處理引擎

若是你沒有主動對RDDCache/Persist,它不過是一個概念上存在的虛擬數據集,你其實是看不到這個RDD的數據的全集的(他不會真的都放到內存裏)。函數

RDD的本質是什麼

一個RDD 本質上是一個函數,而RDD的變換不過是函數的嵌套。RDD我認爲有兩類:oop

  1. 輸入RDD,典型如KafkaRDD,JdbcRDD
  2. 轉換RDD,如MapPartitionsRDD

咱們如下面的代碼爲例作分析:spa

sc.textFile("abc.log").map().saveAsTextFile("")
  • textFile 會構建出一個NewHadoopRDD,
  • map函數運行後會構建出一個MapPartitionsRDD
  • saveAsTextFile觸發了實際流程代碼的執行

因此RDD不過是對一個函數的封裝,當一個函數對數據處理完成後,咱們就獲得一個RDD的數據集(是一個虛擬的,後續會解釋)。code

NewHadoopRDD是數據來源,每一個parition負責獲取數據,得到過程是經過iterator.next 得到一條一條記錄的。假設某個時刻拿到了一條數據A,這個A會馬上被map裏的函數處理獲得B(完成了轉換),而後開始寫入到HDFS上。其餘數據重複如此。因此整個過程:orm

  • 理論上某個MapPartitionsRDD裏實際在內存裏的數據等於其Partition的數目,是個很是小的數值。
  • NewHadoopRDD則會略多些,由於屬於數據源,讀取文件,假設讀取文件的buffer是1M,那麼最多也就是partitionNum*1M 數據在內存裏
  • saveAsTextFile也是同樣的,往HDFS寫文件,須要buffer,最多數據量爲 buffer* partitionNum

因此整個過程實際上是流式的過程,一條數據被各個RDD所包裹的函數處理。內存

剛纔我反覆提到了嵌套函數,怎麼知道它是嵌套的呢?it

若是你寫了這樣一個代碼:io

sc.textFile("abc.log").map().map().........map().saveAsTextFile("")

有成千上萬個map,極可能就堆棧溢出了。爲啥?其實是函數嵌套太深了。

按上面的邏輯,內存使用實際上是很是小的,10G內存跑100T數據也不是難事。可是爲何Spark經常由於內存問題掛掉呢? 咱們接着往下看。

Shuffle的本質是什麼?

這就是爲何要分Stage了。每一個Stage其實就是我上面說的那樣,一套數據被N個嵌套的函數處理(也就是你的transform動做)。遇到了Shuffle,就被切開來,所謂的Shuffle,本質上是把數據按規則臨時都落到磁盤上,至關於完成了一個saveAsTextFile的動做,不過是存本地磁盤。而後被切開的下一個Stage則以本地磁盤的這些數據做爲數據源,從新走上面描述的流程。

咱們再作一次描述:

所謂Shuffle不過是把處理流程切分,給切分的上一段(咱們稱爲Stage M)加個存儲到磁盤的Action動做,把切分的下一段(Stage M+1)數據源變成Stage M存儲的磁盤文件。每一個Stage均可以走我上面的描述,讓每條數據均可以被N個嵌套的函數處理,最後經過用戶指定的動做進行存儲。

爲何Shuffle 容易致使Spark掛掉

前面咱們提到,Shuffle不過是偷偷的幫你加上了個相似saveAsLocalDiskFile的動做。然而,寫磁盤是一個高昂的動做。因此咱們儘量的把數據先放到內存,再批量寫到文件裏,還有讀磁盤文件也是給費內存的動做。把數據放內存,就遇到個問題,好比10000條數據,到底會佔用多少內存?這個其實很難預估的。因此一不當心,就容易致使內存溢出了。這其實也是一個很無奈的事情。

咱們作Cache/Persist意味着什麼?

其實就是給某個Stage加上了一個saveAsMemoryBlockFile的動做,而後下次再要數據的時候,就不用算了。這些存在內存的數據就表示了某個RDD處理後的結果。這個纔是說爲啥Spark是內存計算引擎的地方。在MR裏,你是要放到HDFS裏的,但Spark容許你把中間結果放內存裏。

總結

咱們從一個較新的角度解釋了RDD 和Shuffle 都是一個什麼樣的東西。

相關文章
相關標籤/搜索