一、RDD[(k,v)] join()優化,join以前會對兩個RDD的key作hash,經過網絡把相同hash值的數據傳到同一個節點,所以對屢次join的RDD 作預分區與持久化可提升效率。html
map()操做會失去父RDD的信息,由於key值有可能發生改變,但 mapValues()、flatMapValues()不會。多父RDD已分區,默認採起第一個父RDD的分區方式緩存
cogroup() 、groupWith() 、 join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 以及 lookup() 等發生跨節點數據混洗的操做均可以進行優化。網絡
RDD.partitionBy( new HashPartitioner(3)).persist(StorageLevel.MEMORY_AND_DISK_SER)//構造3個分區
RDD.partitioner//獲取分區信息
2.累加器,行動操做中每一個任務只會對累加器修改一次,轉換操做也許會因爲緩存移出又從新使用等操做致使屢次修改。(spark1.2)數據結構
只有驅動器能夠讀,對執行器是隻寫變量。app
val accu = sc.accumulator(initialValue)
3.廣播變量,只被發到各節點一次,相似BitTorrent通訊機制,只讀,修改不會影響其餘節點的值,less
val broad = sc.broadcast(T)
4.task、stage、jobide
一個RDD有多少partition 就會生成多少task;一個或多個RDD生成一個stage(通常以shuffle操做爲分隔);一個action 生成一個job性能
spark讀取文件的並行度與HDFS block有關,HBASE region數有關,hive文件不可分割則與文件數有關,可分割文件與分割數有關。未壓縮文件和BZip2Codec壓縮類型可分割。優化
/** This input format overrides computeSplitSize() to make sure that each split only contains full records. Each InputSplit passed to FixedLengthBinaryRecordReader will start at the first byte of a record, and the last byte will the last byte of a record. */ override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = { val defaultSize = Math.max(minSize, Math.min(maxSize, blockSize)) // If the default size is less than the length of a record, make it equal to it Otherwise, make sure the split size is as close to possible as the default size,but still contains a complete set of records, with the first record starting at the first byte in the split and the last record ending with the last byte if (defaultSize < recordLength) { recordLength.toLong } else { (Math.floor(defaultSize / recordLength) * recordLength).toLong } }
5.性能調優ui
a.調整並行度
val rdd2 = rdd1.map((_,1)).reduceByKey(_+_,10)//shuffle時調整並行度 shuffle原理:https://www.cnblogs.com/diaozhaojian/p/9635829.html
//Mapreduce和spark shuffle區別(https://mp.weixin.qq.com/s/FT2V9IwNoMl_JU_UDulJ-w) rdd2.repartition(10)//對RDD作重分區,會打亂數據作重分區 rdd2.coalesce(10)//減小分區,調用Repartition(numPartitions, shuffle = false, logicalPlan)
6.RDD.CheckPoint與RDD.persist(https://www.cnblogs.com/jcchoiling/p/6513569.html)(https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81417061)
CheckPoint 把RDD持久化到HDFS上,加強容錯性。job運行時若檢測到RDD.CheckPoint會啓動一個新job作checkpoint操做,同時刪除父RDD,截斷依賴鏈,改變lineage(譜系圖)。
設置檢查點目錄sc.setCheckpointDir("hdfs://IP:9000/checkpoint/")。由於checkpoint要從新計算,所以checkpoint以前建議先persist。
persist 根據存儲級別,把數據緩存到不一樣介質上,只是保存數據,不改變DAG。
Persist 由executor的blockManager管理,所以driver結束以後persist到 磁盤的數據也會清除,而checkpoint 持久化到HDFS,利用HDFS高可用,不調用remove不會清除。
7.Parquet與ORC(https://blog.csdn.net/yu616568/article/details/51868447)(ORC:https://www.cnblogs.com/ITtangtang/p/7677912.html)
parquet支持嵌套數據結構,經過repeated和group實現Map、Array等複雜數據結構。(每一個字段有重複次數(required(出現一次)repeated(0或屢次)optional(0或1次))、字段類型(group和primitive)和字段名三個屬性)。
多行記錄構成一個行組(row group),行組中每一個列做爲一個列塊(column chunk),不一樣列塊可採起不一樣壓縮方式,列塊劃分爲多個頁。爲了更好地存儲嵌套格式,頁的成員值由value、Repetition level和Definition level三部分組成,對於repeated類型列,repetition lever標記了所處哪條記錄已經在該記錄位置。
每一個行組的統計信息包括schema、列塊的最大最小值空值數等信息。每一個頁的元數據包括value數目,數據頁、索引頁的offset等信息。
ORC格式經過把struct類型生成一個schema樹,struct類型做爲根節點,中序遍歷子節點,獲得全部葉子節點的數據,交由父節點封裝成嵌套數據結構。以此來支持LIST、STRUCT、MAP等複雜結構。