引言:在多臺機器上分佈數據以及處理數據是Spark的核心能力,即咱們所說的大規模的數據集處理。爲了充分利用Spark特性,應該考慮一些調優技術。本文每一小節都是關於調優技術的,並給出瞭如何實現調優的必要步驟。
本文選自《Spark GraphX實戰》。算法
咱們知道Spark 能夠經過 RDD 實現計算鏈的原理 :轉換函數包含在 RDD 鏈中,但僅在調用 action 函數後纔會觸發實際的求值過程,執行分佈式運算,返回運算結果。要是在 同一 RDD 上重複調用 action 會發生什麼?shell
通常 RDD 不會保留運算結果,若是再次調用 action 函數,整個 RDD 鏈會從新 運算。有些狀況下這不會有問題,可是對於許多機器學習任務和圖處理任務,這就 是很大的問題了。一般須要屢次迭代的算法,在同一個 RDD 上執行不少次,反覆 地從新加載數據和從新計算會致使時間浪費。更糟糕的是,這些算法一般須要很長 的 RDD 鏈。
看來咱們須要另外一種方式來充分利用集羣可用內存來保存 RDD 的運算結果。 這就是 Spark 緩存(緩存也是 Spark 支持的一種持久化類型)。
要在內存中緩存一個 RDD,能夠調用 RDD 對象的 cache 函數。如下在 spark- shell 中執行的代碼,會計算文件的總行數,輸出文件內容 :apache
val filename = "..."val rdd1 = sc.textFile(filename).cacherdd1.countrdd1.collect
若是不調用 cache 函數,當 count 和 collect 這兩個 action 函數被調用時, 會致使執行從存儲系統中讀文件兩次。調用了 cache 函數,第一個 action 函數(count 函數)會把它的運算結果保留在內存中,在執行第二個 action 函數(collection 函數)時,會直接在使用緩存的數據上繼續運算,而不須要從新計算整個 RDD 鏈。 即便經過轉換緩存的 RDD,生成新的 RDD,緩存的數據仍然可用。下面的代碼會找出全部的註釋行(以 # 開始的行數據)。緩存
val rdd2 =rdd1.filter(_.startsWith("#")) rdd2.collect
由於 rdd2 源於已緩存的 rdd1,rdd1 已經把它的運算結果緩存在內存中了, 因此 rdd2 也就不須要從新從存儲系統中讀取數據。微信
注意:cache 方法做爲一個標誌表示 RDD 應當緩存,但並非當即緩存。 緩存發生在當前 RDD 在下一次要被計算的時候。網絡
如上所述,緩存是其中一種持久化類型。下表列出了 Spark 支持的全部持久 化等級。
每一個持久化等級都定義在單例對象 StorageLevel 中。例如,調用 rdd.persist(StorageLevel.MEMORY_AND_DISK)方法會把 RDD 設置成內存和磁盤緩 存。 cache 方法內部也是調用 rdd.persist(StorageLevel.MEMORY_ONLY)。框架
注意 :其餘的持久化等級,如 MEMORY_ONLY二、MEMORY_AND_ DISK2 等,也是可用的。它們會複製 RDD到集羣的其餘節點上,以便 提供容錯能力。這些內容超出了本書範圍,感興趣的讀者能夠看看 Petar Zec' evic' 和 Marko
Bonac' i(Manning, 2016)的書 Spark in Action,這本書更 深刻地介紹了 Spark 容錯方面的內容。機器學習
不管何時,經過 Graph 對象調用一些函數如 mapVertices 或 aggregateMessages, 這些操做都是基於下層的 RDD 實現的。
Graph 對象提供了基於頂點 RDD 和邊 RDD 方便的緩存和持久化方法。分佈式
雖然看起來緩存是一個應該被處處使用的好東西,可是用得太多也會讓人過分依賴它。
當緩存愈來愈多的 RDD 後,可用的內存就會減小。最終 Spark 會把分區數據從 內存中逐出(使用最少最近使用算法,LRU)。同時,緩存過多的 Java 對象,JVM 垃圾回收高耗是不可避免的。這就是爲何當緩存再也不被使用時頗有必要調用 un- persist 方法。對迭代算法而言,在循環中經常使用下面的方法調用模式 :ide
調用 Graph 的 cache 或 persist 方法。
調用 Graph 的 action 函數,觸發 Graph 下面的 RDD 被緩存……
執行算法主體的其他部分。
在循環體的最後部分,反持久化,即釋放緩存。
提示 :用Pregel API的好處是,它已經在內部作了緩存和釋放緩存的操做。
不能盲目地在內存中緩存 RDD。要考慮數據集會被訪問多少次以及每次訪問時 重計算和緩存的代價對比,重計算也可能比增長內存的方式付出的代價小。
毫無疑問,若是僅僅讀一次數據集,緩存 RDD 就毫無心義,這還會讓做業運 行得更慢,特別是用了有序列化的持久化等級。
圖算法中一個經常使用的模式是用每一個迭代過程當中運算後的新數據更新圖。這意味 着,實際構成圖的頂點 RDD 亦或邊 RDD 的鏈會變得愈來愈長。
定義 :當 RDD 由逐級繼承的祖先 RDD 鏈造成時,咱們說從 RDD 到 根 RDD 的路徑是其譜系。
下面清單所示的示例是一個簡單的算法,可生成一個新頂點集並更新圖。這個 算法迭代的次數由變量 iterations 控制。
上述代碼每一次調用 joinVertices 都會增長一個新 RDD 到頂點 RDD 鏈中。 顯然咱們須要使用緩存來確保在每次迭代中避免從新計算 RDD 鏈,但這並不能改變一個事實,那就是有一個不斷增加的子 RDD 到父 RDD 的對象引用列表。
這樣的後果是,若是運行迭代次數過多,運行的代碼中最終會爆出 Stack- OverflowError 棧溢出錯誤。一般迭代 500次就會出現棧溢出。
而由 RDD 提供而且被 Graph 繼承的一個特性 :checkpointing,能解決長 RDD 譜系問題。下面清單中的代碼示範瞭如何使用 checkpointing,這樣就能夠持續輸出 頂點,更新結果圖。
一個標記爲 checkpointing 的 RDD 會把 RDD 保存到一個 checkpoint 目錄,然 後指向父 RDD 的鏈接被切斷,即切斷了 lineage 譜系。一個標記爲 checkpointing 的 Graph 會致使下面的頂點 RDD 和邊 RDD 作 checkpoint。
調用 SparkContext.setCheckpointDir 來設置 checkpoint 目錄,指定一個 共享存儲系統的文件路徑,如 HDFS。
如前面的代碼清單所示,必須在調用 RDD 任何方法以前調用 checkpoint,這 是由於 checkpointing 是一個至關耗時的過程(畢竟須要把圖寫入磁盤文件),一般 須要不斷地 checkpoint 避免棧溢出錯誤,通常能夠每 100 次迭代作一次 checkpoint。
注意 :一個加速 checkpointing 的選擇是 checkpoint 到 Tachyon(已 改名爲 Alluxio),而不是checkpoint 到標準的文件系 統。Alluxio,來自 AMPLab,是一個「之內存爲中心的有容錯能力的分佈式文件系統,它能讓Spark 這類集羣框架加速訪問共享在內存中的文件」。
內存壓力(內存不夠用)每每是 Spark 應用性能差和容易出故障的主要緣由 之一。這些問題一般表現爲頻繁的、耗時的 JVM 垃圾回收和「內存不足」的錯 誤。checkpointing 在這裏也不能緩解內存壓力。遇到這種問題,首先要考慮序列化 Graph 對象。
定義 :數據序列化,Data serialization,是把 JVM 裏表示的對象實 例轉換(序列化)成字節流 ;把字節流經過網絡傳輸到另外一個 JVM 進程 中 ;在另外一個 JVM 進程中,字節流能夠被「反序列化」爲一個對象實例。Spark用序列化的方式,能夠在網絡間傳輸對象,也能夠把序列化後的字節流緩存在內存中。
要用序列化,能夠選用 persist 中下面的 StorageLevels :
StorageLevel.MEMORY_ONLY_SER
StorageLevel.MEMORY_AND_DISK_SER
序列化節省了空間,同時序列化和反序列化也會增長 CPU 的開銷。
Spark 默認使用 JavaSerializer 來序列化對象,這是一個低效的 Java 序列化框架,一個更好的選擇是選用 Kryo。Kryo 是一個開源的 Java 序列化框架,提供了 快速高效的序列化能力。
Spark 中使用 Kryo 序列 化,只須要設置 spark.serializer 參數爲 org. apache.spark.serializer.KryoSerializer,如這樣設置命令行參數 :
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
要是每次都這樣設置參數,會很煩瑣。能夠在 $Spark_HOME/conf/spark-
defaults.conf 這個配置文件中,用標準的屬性文件語法(用 Tab 分隔做爲一行),把 spark.serializer 等參數及其對應的值寫入這個配置文件,以下所示 :
spark.serializer org.apache.spark.serializer.KryoSerializer
爲保證性能最佳,Kryo 要求註冊要序列化的類,若是不註冊,類名也會被序列 化在對象字節碼裏,這樣對性能有較大影響。幸運的是,Spark 對其框架裏用到的 類作了自動註冊 ;可是,若是應用程序代碼裏有自定義的類,剛好這些自定義類也 要用 Kryo 序列化,那就須要調用 SparkConf.registerKryoClasses 函數來手 動註冊。下面的清單展現瞭如何註冊 Person 這個自定義類。
在應用程序調優時,經常須要知道 RDD 的大小。這就很棘手,由於文件或數 據庫中對象的大小和 JVM 中對象佔用多少內存沒有太大關係。
一個小技巧是,先將 RDD 緩存到內存中,而後到 Spark UI 中的 Storage 選項卡, 這裏記錄着 RDD 的大小。要衡量配置了序列化的效果,用這個方法也能夠。
本文選自《Spark GraphX實戰》,點此連接可在博文視點官網查看此書。
想及時得到更多精彩文章,可在微信中搜索「博文視點」或者掃描下方二維碼並關注。