Spark使用總結與分享

背景 java

   

使用spark開發已有幾個月。相比於python/hive,scala/spark學習門檻較高。尤爲記得剛開時,舉步維艱,進展十分緩慢。不過謝天謝地,這段苦澀(bi)的日子過去了。憶苦思甜,爲了不項目組的其餘同窗走彎路,決定總結和梳理spark的使用經驗。 python

   

Spark基礎 算法

   

基石RDD sql

   

spark的核心是RDD(彈性分佈式數據集),一種通用的數據抽象,封裝了基礎的數據操做,如map,filter,reduce等。RDD提供數據共享的抽象,相比其餘大數據處理框架,如MapReduce,Pegel,DryadLINQ和HIVE等均缺少此特性,因此RDD更爲通用。 apache

   

簡要地歸納RDD:RDD是一個不可修改的,分佈的對象集合。每一個RDD由多個分區組成,每一個分區能夠同時在集羣中的不一樣節點上計算。RDD能夠包含Python,Java和Scala中的任意對象。 編程

   

Spark生態圈中應用都是基於RDD構建(下圖),這一點充分說明RDD的抽象足夠通用,能夠描述大多數應用場景。 緩存

   

RDD操做類型—轉換和動做 網絡

   

RDD的操做主要分兩類:轉換(transformation)和動做(action)。兩類函數的主要區別是,轉換接受RDD並返回RDD,而動做接受RDD可是返回非RDD。轉換採用惰性調用機制,每一個RDD記錄父RDD轉換的方法,這種調用鏈表稱之爲血緣(lineage);而動做調用會直接計算。 閉包

採用惰性調用,經過血緣鏈接的RDD操做能夠管道化(pipeline),管道化的操做能夠直接在單節點完成,避免屢次轉換操做之間數據同步的等待框架

使用血緣串聯的操做能夠保持每次計算相對簡單,而不用擔憂有過多的中間數據,由於這些血緣操做都管道化了,這樣也保證了邏輯的單一性,而不用像MapReduce那樣,爲了竟可能的減小map reduce過程,在單個map reduce中寫入過多複雜的邏輯。

   

   

RDD使用模式

   

RDD使用具備通常的模式,能夠抽象爲下面的幾步

  1. 加載外部數據,建立RDD對象
  2. 使用轉換(如filter),建立新的RDD對象
  3. 緩存須要重用的RDD
  4. 使用動做(如count),啓動並行計算

   

RDD高效的策略

   

Spark官方提供的數據是RDD在某些場景下,計算效率是Hadoop的20X。這個數據是否有水分,咱們先不追究,可是RDD效率高的由必定機制保證的:

  1. RDD數據只讀,不可修改。若是須要修改數據,必須從父RDD轉換(transformation)到子RDD。因此,在容錯策略中,RDD沒有數據冗餘,而是經過RDD父子依賴(血緣)關係進行重算實現容錯。
  2. RDD數據在內存中,多個RDD操做之間,數據不用落地到磁盤上,避免沒必要要的I/O操做。
  3. RDD存放的數據能夠是java對象,因此避免的沒必要要的對象序列化和反序列化。

總而言之,RDD高效的主要因素是儘可能避免沒必要要的操做和犧牲數據的操做精度,用來提升計算效率。

   

   

Spark使用技巧

   

RDD基本函數擴展

   

RDD雖然提供了不少函數,可是畢竟仍是有限的,有時候須要擴展,自定義新的RDD的函數。在spark中,能夠經過隱式轉換,輕鬆實現對RDD擴展。畫像開發過程當中,平凡的會使用rollup操做(相似HIVE中的rollup),計算多個級別的聚合數據。下面是具體實,

/**

* 擴展spark rdd,爲rdd提供rollup方法

*/

implicit class RollupRDD[T: ClassTag](rdd: RDD[(Array[String], T)]) extends Serializable {

 

/**

* 相似Sql中的rollup操做

*

* @param aggregate 聚合函數

* @param keyPlaceHold key佔位符,默認採用FaceConf.STAT_SUMMARY

* @param isCache,確認是否緩存數據

* @return 返回聚合後的數據

*/

def rollup[U: ClassTag](

aggregate: Iterable[T] => U,

keyPlaceHold: String = FaceConf.STAT_SUMMARY,

isCache: Boolean = true): RDD[(Array[String], U)] = {

 

if (rdd.take(1).isEmpty) {

return rdd.map(x => (Array[String](), aggregate(Array[T](x._2))))

}

 

if (isCache) {

rdd.cache // 提升計算效率

}

val totalKeyCount = rdd.first._1.size

val result = { 1 to totalKeyCount }.par.map(untilKeyIndex => { // 並行計算

rdd.map(row => {

val combineKey = row._1.slice(0, untilKeyIndex).mkString(FaceConf.KEY_SEP) // 組合key

(combineKey, row._2)

}).groupByKey.map(row => { // 聚合計算

val oldKeyList = row._1.split(FaceConf.KEY_SEP)

val newKeyList = oldKeyList ++ Array.fill(totalKeyCount - oldKeyList.size) { keyPlaceHold }

(newKeyList, aggregate(row._2))

})

}).reduce(_ ++ _) // 聚合結果

 

result

}

 

}

上面代碼聲明瞭一個隱式類,具備一個成員變量rdd,類型是RDD[(Array[String], T)],那麼若是應用代碼中出現了任何這樣的rdd對象,而且import當前的隱式轉換,那麼編譯器就會將這個rdd當作上面的隱式類的對象,也就可使用rollup函數,和通常的map,filter方法同樣。

   

   

RDD操做閉包外部變量原則

   

RDD相關操做都須要傳入自定義閉包函數(closure),若是這個函數須要訪問外部變量,那麼須要遵循必定的規則,不然會拋出運行時異常。閉包函數傳入到節點時,須要通過下面的步驟:

  1. 驅動程序,經過反射,運行時找到閉包訪問的全部變量,並封成一個對象,而後序列化該對象
  2. 將序列化後的對象經過網絡傳輸到worker節點
  3. worker節點反序列化閉包對象
  4. worker節點執行閉包函數,

注意:外部變量在閉包內的修改不會被反饋到驅動程序。

簡而言之,就是經過網絡,傳遞函數,而後執行。因此,被傳遞的變量必須能夠序列化,不然傳遞失敗。本地執行時,仍然會執行上面四步。

   

廣播機制也能夠作到這一點,可是頻繁的使用廣播會使代碼不夠簡潔,並且廣播設計的初衷是將較大數據緩存到節點上,避免屢次數據傳輸,提升計算效率,而不是用於進行外部變量訪問。

   

   

RDD數據同步

   

RDD目前提供兩個數據同步的方法:廣播和累計器。

   

廣播 broadcast

前面提到過,廣播能夠將變量發送到閉包中,被閉包使用。可是,廣播還有一個做用是同步較大數據。好比你有一個IP庫,可能有幾G,在map操做中,依賴這個ip庫。那麼,能夠經過廣播將這個ip庫傳到閉包中,被並行的任務應用。廣播經過兩個方面提升數據共享效率:1,集羣中每一個節點(物理機器)只有一個副本,默認的閉包是每一個任務一個副本;2,廣播傳輸是經過BT下載模式實現的,也就是P2P下載,在集羣多的狀況下,能夠極大的提升數據傳輸速率。廣播變量修改後,不會反饋到其餘節點。

   

累加器 Accumulator

累加器是一個write-only的變量,用於累加各個任務中的狀態,只有在驅動程序中,才能訪問累加器。並且,截止到1.2版本,累加器有一個已知的缺陷,在action操做中,n個元素的RDD能夠確保累加器只累加n次,可是在transformation時,spark不確保,也就是累加器可能出現n+1次累加。

   

目前RDD提供的同步機制粒度太粗,尤爲是轉換操做中變量狀態不能同步,因此RDD沒法作複雜的具備狀態的事務操做。不過,RDD的使命是提供一個通用的並行計算框架,估計永遠也不會提供細粒度的數據同步機制,由於這與其設計的初衷是違背的。

   

RDD優化技巧

   

RDD緩存

須要使用屢次的數據須要cache,不然會進行沒必要要的重複操做。舉個例子

val data = … // read from tdw

println(data.filter(_.contains("error")).count)

println(data.filter(_.contains("warning")).count)

上面三段代碼中,data變量會加載兩次,高效的作法是在data加載完後,馬上持久化到內存中,以下

val data = … // read from tdw

data.cache

println(data.filter(_.contains("error")).count)

println(data.filter(_.contains("warning")).count)

這樣,data在第一加載後,就被緩存到內存中,後面兩次操做均直接使用內存中的數據。

   

轉換並行化

RDD的轉換操做時並行化計算的,可是多個RDD的轉換一樣是能夠並行的,參考以下

val dataList:Array[RDD[Int]] = …

val sumList = data.list.map(_.map(_.sum))

上面的例子中,第一個map是便利Array變量,串行的計算每一個RDD中的每行的sum。因爲每一個RDD之間計算是沒有邏輯聯繫的,因此理論上是能夠將RDD的計算並行化的,在scala中能夠輕鬆試下,以下

val dataList:Array[RDD[Int]] = …

val sumList = data.list.par.map(_.map(_.sum))

注意紅色代碼。

   

減小shuffle網絡傳輸

通常而言,網絡I/O開銷是很大的,減小網絡開銷,能夠顯著加快計算效率。任意兩個RDD的shuffle操做(join等)的大體過程以下,

用戶數據userData和事件events數據經過用戶id鏈接,那麼會在網絡中傳到另一個節點,這個過程當中,有兩個網絡傳輸過程。Spark的默認是完成這兩個過程。可是,若是你多告訴spark一些信息,spark能夠優化,只執行一個網絡傳輸。能夠經過使用、HashPartition,在userData"本地"先分區,而後要求events直接shuffle到userData的節點上,那麼就減小了一部分網絡傳輸,減小後的效果以下,

虛線部分都是在本地完成的,沒有網絡傳輸。在數據加載時,就按照key進行partition,這樣能夠經一部的減小本地的HashPartition的過程,示例代碼以下

val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://…")

.partitionBy(new HashPartitioner(100)) // Create 100 partitions

.persist()

注意,上面必定要persist,不然會重複計算屢次。100用來指定並行數量。

   

Spark其餘

   

Spark開發模式

   

因爲spark應用程序是須要在部署到集羣上運行的,致使本地調試比較麻煩,因此通過這段時間的經驗累積,總結了一套開發流程,目的是爲了儘量的提升開發調試效率,同時保證開發質量。固然,這套流程可能也不是最優的,後面須要持續改進。

整個流程比較清楚,這裏主要談談爲何須要單元測試。公司內的大多數項目,通常不提倡單元測試,並且因爲項目進度壓力,開發人員會很是抵觸單元測試,由於會花費"額外"的精力。Bug這東西不會由於項目趕進度而消失,並且剛好相反,可能由於趕進度,而高於平均水平。因此,若是不花時間進行單元測試,那麼會花一樣多,甚至更多的時間調試。不少時候,每每一些很小的bug,卻致使你花了很長時間去調試,而這些bug,剛好是很容易在單元測試中發現的。並且,單元測試還能夠帶來兩個額外的好處:1)API使用範例;2)迴歸測試。因此,仍是單元測試吧,這是一筆投資,並且ROI還挺高!不過凡事須要掌握分寸,單元測試應該根據項目緊迫程度調整粒度,作到有所爲,有所不爲。

 

Spark其餘功能

   

前面提到了spark生態圈,spark除了核心的RDD,還提供了之上的幾個很使用的應用:

  1. Spark SQL: 相似hive,使用rdd實現sql查詢
  2. Spark Streaming: 流式計算,提供實時計算功能,相似storm
  3. MLLib:機器學習庫,提供經常使用分類,聚類,迴歸,交叉檢驗等機器學習算法並行實現。
  4. GraphX:圖計算框架,實現了基本的圖計算功能,經常使用圖算法和pregel圖編程框架。

   

後面須要繼續學習和使用上面的功能,尤爲是與數據挖掘強相關的MLLib。

   

參考資料

相關文章
相關標籤/搜索