Spark學習記錄(三)核心API模塊介紹

spark
-------------
基於hadoop的mr,擴展MR模型高效使用MR模型,內存型集羣計算,提升app處理速度。java

spark特色
-------------
速度:在內存中存儲中間結果。
支持多種語言。Scala、Java、Python
內置了80+的算子.
高級分析:MR,SQL/ Streamming /mllib / graphpython

RDD:
----------------
是spark的基本數據結構,是不可變數據集。RDD中的數據集進行邏輯分區,每一個分區能夠單獨在集羣節點
進行計算。能夠包含任何java,scala,python和自定義類型。mysql

RDD是隻讀的記錄分區集合。RDD具備容錯機制。sql

建立RDD方式,1、並行化一個現有集合。數組

hadoop 花費90%時間用戶rw。、

內存處理計算。在job間進行數據共享。內存的IO速率高於網絡和disk的10 ~ 100之間。網絡

內部包含5個主要屬性
-----------------------
1.分區列表
2.針對每一個split的計算函數。
3.對其餘rdd的依賴列表
4.可選,若是是KeyValueRDD的話,能夠帶分區類。
5.可選,首選塊位置列表(hdfs block location);數據結構

 

RDD變換app

rdd的變換方法都是lazy執行的
------------------
返回指向新rdd的指針,在rdd之間建立依賴關係。每一個rdd都有計算函數和指向父RDD的指針。函數


map() //對每一個元素進行變換,應用變換函數
//(T)=>Voop

mapPartitions() //對每一個分區進行應用變換,輸入的Iterator,返回新的迭代器,能夠對分區進行函數處理。

//針對每一個數據分區進行操做,入參是分區數據的Iterator,map() 針對分區中的每一個元素進行操做。

mapPartitions()  //Iterator<T> => Iterator<U>

注:最好設置每一個分區都對應有一個線程。

filter() //過濾器,(T)=>Boolean
flatMap() //壓扁,T => TraversableOnce[U]

 

//同mapPartitions方法同樣都是針對分區處理,只不過這個方法能夠獲取到分區索引

mapPartitionsWithIndex(func)  //(Int, Iterator<T>) => Iterator<U>

 

//採樣返回採樣的RDD子集。
//withReplacement 元素是否能夠屢次採樣.
//fraction : 指望採樣數量.[0,1]
//表示一個種子,根據這個seed隨機抽取,通常都只用到前兩個參數
sample(withReplacement, fraction, seed)

做用:在數據傾斜的時候,咱們那麼多數據若是想知道那個key傾斜了,就須要咱們採樣獲取這些key,出現次數陊的key就是致使數據傾斜的key。若是這些key數據不是很重要的話,能夠過濾掉,這樣就解決了數據傾斜。

 

 

union() //相似於mysql union操做。

 

intersection //交集,提取兩個rdd中都含有的元素。


distinct([numTasks])) //去重,去除重複的元素。

 

groupByKey() //(K,V) => (K,Iterable<V>)  使用前須要構造出對偶的RDD

reduceByKey(*) //按key聚合。注意他是一個RDD變換方法,不是action

 

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key進行聚合,這個函數邏輯較爲複雜請看aggregateByKey函數的專題

 

sortByKey //根據映射的Key進行排序,可是隻能根據Key排序

 

sortBy //比sortByKey更加靈活強大的排序,可根據元組中任意字段排序

 

join(otherDataset, [numTasks]) //橫向鏈接,有兩種數據(K,V)和(K,W),連接後返回(K,(V,W)),兩個元組一一對應的

 

cogroup //協分組,(K,V)和(K,W)分組後返回(K,(V,W)),注意協分組不是一一對應的分組後須要(此處注意與join的區別)



cartesian(otherDataset) //笛卡爾積,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]

 

pipe //將rdd的元素傳遞給腳本或者命令,執行結果返回造成新的RDD


coalesce(numPartitions) //減小分區


repartition //再分區


repartitionAndSortWithinPartitions(partitioner)//再分區並在分區內進行排序

 

RDD Action

Spack的中的方法都是懶的,,只有遇到了action類型的方法纔會真正的執行
------------------
collect() //收集rdd元素造成數組.
count() //統計rdd元素的個數
reduce() //聚合,返回一個值。
first //取出第一個元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])

saveAsTextFile(path) //保存到文件
saveAsSequenceFile(path) //保存成序列文件  sc.sequenceFile讀取序列文件

saveAsObjectFile(path) (Java and Scala)

countByKey()                       //按照key統計有幾個value 

 

數據傾斜

------------------------------

因爲大量相同的Key,在reduce合併計算的過程當中,大量相同的Key被分配到了同一個集羣節點,致使集羣中這個節點計算壓力很是大。

本例採用的解決方案是,在map截斷將Key先接上一個隨機數打散,而後在reduce計算後,再次map還原key,而後進行最終reduce。


Spark WebUI 上面代碼運行的DAG 有效無環圖,咱們能夠清楚地看到每一次的reduce聚合都會從新劃分階段

相關文章
相關標籤/搜索