1. 數據序列化html
默認使用的是Java自帶的序列化機制。優勢是能夠處理全部實現了java.io.Serializable 的類。
可是Java 序列化比較慢。java
可使用Kryo序列化機制,一般比Java 序列化機制性能高10倍。可是並不支持全部實現了java.io.Serializable 的類。使用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 開啓Kryo序列化。不使用Kryo作爲默認值的緣由是:須要註冊自定義的類。例如:web
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)
注意:若是Object很大,須要在配置中增長 spark.kryoserializer.buffer 的值。若是沒有在Kryo中註冊自定義的類,Kryo也能正常工做,這些類會徹底地保存下來(等於沒有序列化就進行傳輸或保存了),會形成資源浪費。apache
2. 內存調優api
能夠考慮3個方面:(1)對象須要的總內存 (2)指向這些對象的指針 (3)GC數組
一般狀況下,指針佔用的空間將是原始數據的2~5倍。有如下幾個緣由:數據結構
(1)Java對象的「object header」(對象頭),包含了指向它的類的指針,佔用16bytes。對於一些只有不多數據的object,16bytes要比對象自己佔用的空間要多。oracle
(2)Java String 中在原始String數據的基礎上有另外40bytes的開銷(String的保存形式是Char的數組,而且有length的額外數據)。由於String內部使用UTF-16編碼,每一個char 佔用2個byte。所以10個字符的String,將會很輕易地佔用60個bytes性能
(3)諸如HashMap,LinkedList 的集合類,使用鏈式結構,每一個entry(Map.Entry)都有一個包裝類。這些類不只有「object header」,還有指向下一個對象的指針(一般是8個bytes)。優化
(4)基本類型的集合,一般會被包裝成對象類型。
3. 內存管理
Spark中的內存使用主要有兩類:執行內存和存儲內存。執行內存是指shuffles, joins, sorts and aggregations計算時用到的內存。存儲內存主要是指cache和集羣間傳播的內部數據用到的內存。執行內存和存儲內存使用的是同一塊區域。當沒有計算執行時, 存儲將得到全部這塊區域的可用內存,反之亦然。執行比存儲具備更高的獲取內存的優先級,也就是說,若是內存不夠時,存儲會釋放一部份內存給執行用,直到存儲須要的最低的閥值。
有兩個相關的配置,可是一般來講,用戶不須要改變其默認值。
(1) spark.memory.fraction 表示使用的Java 堆內存的比例。默認值0.6. 剩下的40%的內存用於:(a)存儲用戶數據、Spark內部元數據 (b)防止OOM
(2)spark.memory.storageFraction 表示上面所說的存儲內存最少佔用的比例。默認值 是0.5
4. 肯定內存消耗
最好的方式是生成一個RDD並cache,在web UI 中的 Storage 中查看佔用了多少內存。
肯定一個指定object 佔用內存的大小,可使用 SizeEstimator.estimate(obj) 方法。
5. 調整數據結構
減小內存消耗,首先應該避免使用基於指針的數據結構和包裝對象等諸如此類的Java特性。有如下幾種途徑:
(1)數據結構優先使用對象數組和基本類型,儘可能不使用Java和scala裏的集合類(如:HashMap)。可使用 fastutil (http://fastutil.di.unimi.it/) 提供的集合類和基本類型。
(2)儘可能避免使用有不少小對象和指針的內嵌結構
(3)考慮使用數字ID 和枚舉類代替做爲key的String
(4)若是內存小於32GB,在Spark-env.sh 裏設置 -XX:+UseCompressedOops,這樣指針使用4bytes 而不是8bytes
6. 序列化RDD 存儲
當你的 object 仍然很大,簡單的下降內存消耗的方法是使用序列化的存儲方法。強烈建議使用kyro序列化機制。
7. 垃圾回收調優
垃圾回收的時間主要是花費在尋找那些再也不被引用的對象。所以它跟Java Object 的數量有關。咱們應該使用具備較少object的數據結構(如:使用array代替linkedList)。一種較好的方法是用序列化的形式持久化Object,這樣每一個RDD partition 只有一個字節數組。
測量GC的影響:在Java option 中加入 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 後,可在worker 的 stdout 中找到GC的日誌。
(1) 在任務完成以前,若是有屢次full GC,說明執行任務的內存不夠
(2) 若是有屢次minor GC,可是 full GC 並很少,能夠增大 Eden 區的大小
(3) 在GC的日誌中,若是老年代快滿了,減小 spark.memory.fraction 以下降cache所用的內存
(4) 嘗試使用 G1 垃圾回收器(-XX:+UseG1GC)。若是堆比較大,應該增長 G1 區的大小(經過 -XX:G1HeapRegionSize 設置
)
(5) 若是任務是從HDFS上讀數據,HDFS 塊的大小爲 128M,塊解壓後的大小通常爲原始大小的2~3倍,若是要運行4個task,能夠估算Eden區須要 4*3*128M。
8. 其它
(1)並行度。除非你手動每步都設置較高的並行度,不然,集羣不會被最大化地利用。Spark會自動根據每一個文件的大小設置相應的task數量。對於諸如groupByKey,reduceByKey 等 reduce 操做,並行度爲最大的父 RDD 的 partition 的數量。能夠配置 spark.default.parallelism 設置默認的並行度。通常來說,建議一個CPU 運行 2~3個task。
(2)Reduce Task 的內存使用。有時候,發生OOM並非由於內存中放不下RDD,而是由於某個或幾個task 分配的內存不夠。例如:某個groupByKey 操做處理很大的數據集(由於數據傾斜的緣故)。 簡單的解決方法是:設置較高的並行度。
(3)廣播大的變量。 使用廣播的功能能有效地減小序列化的 task 的大小和集羣加載job的花消。若是你的task中須要使用一個來自driver的大的object(如:靜態查詢表),應該把它轉化成廣播變量。 Master端會打印序列化後的 task 的大小,一般若是大於20KB 的話,就值得去優化。
(4)數據本地性。數據本地性可分爲如下幾類:
(a) PROCESS_LOCAL 數據在運行代碼的JVM中。
(b) NODE_LOCAL 數據和運行的代碼在同一臺機器上。如:當前節點上正好有HDFS的數據塊。
(c) NO_PREF 數據能夠較快獲取,可是不在本地
(d) RACK_LOCAL 數據在同一 機架內,須要經過network獲取
(e) Any 除上述外的數據
最好的狀況就是 task 都運行在最好的數據本地性的環境,但一般不太可能。不少時候,某個executor 上的任務都完成了,而其它忙碌的機器上尚有未處理的data。Spark一般會等一段時間,以等待忙碌的機器空閒下來去處理數據(由於具備較高的本地性)。當超過這個等待時間後,空間的executor會把這些數據拉過來進行處理。每一個數據本地性級別對應的等待時間能夠查看配置中的 spark.locality
部分。一般默認的配置工做得蠻好的。若是你的task運行時間較長,能夠增長這些值。