spark
-------------
基於hadoop的mr,擴展MR模型高效使用MR模型,內存型集羣計算,提升app處理速度。java
spark特色
-------------
速度:在內存中存儲中間結果。
支持多種語言。Scala、Java、Python
內置了80+的算子.
高級分析:MR,SQL/ Streamming /mllib / graphpython
RDD:
----------------
是spark的基本數據結構,是不可變數據集。RDD中的數據集進行邏輯分區,每一個分區能夠單獨在集羣節點
進行計算。能夠包含任何java,scala,python和自定義類型。mysql
RDD是隻讀的記錄分區集合。RDD具備容錯機制。sql
建立RDD方式,1、並行化一個現有集合。數組
hadoop 花費90%時間用戶rw。、
內存處理計算。在job間進行數據共享。內存的IO速率高於網絡和disk的10 ~ 100之間。網絡
內部包含5個主要屬性
-----------------------
1.分區列表
2.針對每一個split的計算函數。
3.對其餘rdd的依賴列表
4.可選,若是是KeyValueRDD的話,能夠帶分區類。
5.可選,首選塊位置列表(hdfs block location);數據結構
RDD變換app
rdd的變換方法都是lazy執行的
------------------
返回指向新rdd的指針,在rdd之間建立依賴關係。每一個rdd都有計算函數和指向父RDD的指針。函數
map() //對每一個元素進行變換,應用變換函數
//(T)=>Voop
mapPartitions() //對每一個分區進行應用變換,輸入的Iterator,返回新的迭代器,能夠對分區進行函數處理。
//針對每一個數據分區進行操做,入參是分區數據的Iterator,map() 針對分區中的每一個元素進行操做。
mapPartitions() //Iterator<T> => Iterator<U>
注:最好設置每一個分區都對應有一個線程。
filter() //過濾器,(T)=>Boolean
flatMap() //壓扁,T => TraversableOnce[U]
//同mapPartitions方法同樣都是針對分區處理,只不過這個方法能夠獲取到分區索引
mapPartitionsWithIndex(func) //(Int, Iterator<T>) => Iterator<U>
//採樣返回採樣的RDD子集。
//withReplacement 元素是否能夠屢次採樣.
//fraction : 指望採樣數量.[0,1]
//表示一個種子,根據這個seed隨機抽取,通常都只用到前兩個參數
sample(withReplacement, fraction, seed)
做用:在數據傾斜的時候,咱們那麼多數據若是想知道那個key傾斜了,就須要咱們採樣獲取這些key,出現次數陊的key就是致使數據傾斜的key。若是這些key數據不是很重要的話,能夠過濾掉,這樣就解決了數據傾斜。
union() //相似於mysql union操做。
intersection //交集,提取兩個rdd中都含有的元素。
distinct([numTasks])) //去重,去除重複的元素。
groupByKey() //(K,V) => (K,Iterable<V>) 使用前須要構造出對偶的RDD
reduceByKey(*) //按key聚合。注意他是一個RDD變換方法,不是action
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key進行聚合,這個函數邏輯較爲複雜請看aggregateByKey函數的專題
sortByKey //根據映射的Key進行排序,可是隻能根據Key排序
sortBy //比sortByKey更加靈活強大的排序,可根據元組中任意字段排序
join(otherDataset, [numTasks]) //橫向鏈接,有兩種數據(K,V)和(K,W),連接後返回(K,(V,W)),兩個元組一一對應的
cogroup //協分組,(K,V)和(K,W)分組後返回(K,(V,W)),注意協分組不是一一對應的分組後須要(此處注意與join的區別)
cartesian(otherDataset) //笛卡爾積,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]
pipe //將rdd的元素傳遞給腳本或者命令,執行結果返回造成新的RDD
coalesce(numPartitions) //減小分區
repartition //再分區
repartitionAndSortWithinPartitions(partitioner)//再分區並在分區內進行排序
RDD Action
Spack的中的方法都是懶的,,只有遇到了action類型的方法纔會真正的執行
------------------
collect() //收集rdd元素造成數組.
count() //統計rdd元素的個數
reduce() //聚合,返回一個值。
first //取出第一個元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path) //保存到文件
saveAsSequenceFile(path) //保存成序列文件 sc.sequenceFile讀取序列文件
saveAsObjectFile(path) (Java and Scala)
countByKey() //按照key統計有幾個value
數據傾斜
------------------------------
因爲大量相同的Key,在reduce合併計算的過程當中,大量相同的Key被分配到了同一個集羣節點,致使集羣中這個節點計算壓力很是大。
本例採用的解決方案是,在map截斷將Key先接上一個隨機數打散,而後在reduce計算後,再次map還原key,而後進行最終reduce。
Spark WebUI 上面代碼運行的DAG 有效無環圖,咱們能夠清楚地看到每一次的reduce聚合都會從新劃分階段