1.Spark簡單介紹
html
什麼是Spark?git
Spark是UC BerkeleyAmp實驗室開源的類Hadoop MapReduce的通用並行計算框架github
Spark VS MapReduce算法
MapReduce 數據庫
①.缺乏對迭代計算以及DAG運算的支持apache
②.Shuffle過程屢次排序和落地,MR之間的數據需要落Hdfs文件系統編程
Spark 數組
①.提供了一套支持DAG圖的分佈式並行計算的編程框架,下降屢次計算之間中間結果寫到hdfs的開銷緩存
②.提供Cache機制來支持需要重複迭代計算或者屢次數據共享,下降數據讀取的IO開銷多線程
③.使用多線程池模型來下降task啓動開稍。shuffle過程當中避免沒必要要的sort操做以及下降磁盤IO操做
④.普遍的數據集操做類型(map,groupby,count,filter)
⑤.Spark經過提供豐富的Scala, Java。PythonAPI及交互式Shell來提升可用性
⑥.RDD之間維護了血統關係,一旦RDDfail掉了。能經過父RDD本身主動重建,保證了容錯性。 採用容錯的、高可伸縮性的akka做爲通信框架
2.Spark生態系統
3.Scala集合簡單介紹
vallist2 = List(1,2,3,4,5)
list2.map{x=>x +8} //{9,10,11,12,13}
list2.filter{x=>x > 3} //{4,5}
list2.reduce(_ + _)
不少其餘scala學習網址:http://twitter.github.io/scala_school/zh_cn/collections.html
4.spark的關鍵組件
•Master
•Worker
•SparkContext(client)
5.核心概念:彈性分佈式數據集
Spark環繞的概念是彈性分佈式數據集(RDD),這是一個有容錯機制並可以被並行操做的元素集合。
RDD的特色:
失敗本身主動重建。對於丟失部分數據分區僅僅需依據它的lineage(見文章最後介紹)就可又一次計算出來,而不需要作特定的Checkpoint
可以控制存儲級別(內存、磁盤等)來進行重用。
默認是存儲於內存,但當內存不足時。RDD會spill到disk
必須是可序列化的。
眼下RDD有兩種建立方式:並行集合(ParallelizedCollections):接收一個已經存在的Scala集合,而後進行各類並行計算。
Hadoop數據集(HadoopDatasets):在一個文件的每條記錄上運行函數。僅僅要文件系統是HDFS,或者hadoop支持的隨意存儲系統就能夠。這兩種類型的RDD都可以經過一樣的方式進行操做。
1.並行集合(Parallelized Collections)
•並行集合是經過調用SparkContext的parallelize方法。在一個已經存在的Scala集合上建立的(一個Seq對象)。
集合的對象將會被拷貝,建立出一個可以被並行操做的分佈式數據集。
好比。如下的輸出。演示了怎樣從一個數組建立一個並行集合:
•scala> val data = Array(1, 2, 3, 4, 5)
•scala> val distData =sc.parallelize(data)
•一旦分佈式數據集(distData)被建立好,它們將可以被並行操做。好比,咱們可以調用distData.reduce(_+_ )來將數組的元素相加
2.Hadoop數據集(Hadoop Datasets)
•Spark可以從存儲在HDFS,或者Hadoop支持的其餘文件系統(包括本地文件,HBase等等)上的文件建立分佈式數據集。
•Text file的RDDs可以經過SparkContext’stextFile的方式建立,
•scala> val distFile =sc.textFile("data.txt")
並行集合的一個重要參數是slices,表示數據集切分的份數。Spark將會在集羣上爲每一份數據起一個任務。典型地。你可以在集羣的每一個CPU上分佈2-4個slices.通常來講,Spark會嘗試依據集羣的情況,來本身主動設定slices的數目。
然而,你也可以經過傳遞給parallelize的第二個參數來進行手動設置。(好比:sc.parallelize(data,10)).
textFile方法也可以經過輸入一個可選的第二參數,來控制文件的分片數目。
默認狀況下,Spark爲每一塊文件建立一個分片(HDFS默認的塊大小爲64MB),但是你也可以經過傳入一個更大的值,來指定一個更高的片值。注意,你不能指定一個比塊數更小的片值(和Map數不能小於Block數同樣,但是可以比它多)
6.RDD的操做
RDD支持兩種操做:轉換(transformation)從現有的數據集建立一個新的數據集;而動做(actions)在數據集上運行計算後。返回一個值給驅動程序。好比,map就是一種轉換。它將數據集每一個元素都傳遞給函數,並返回一個新的分佈數據集表示結果。
還有一方面。reduce是一種動做。經過一些函數將所有的元素疊加起來。並將終於結果返回給Driver程序。
轉換(transformation)
轉換 |
含義 |
map(func) |
返回一個新分佈式數據集,由每一個輸入元素通過func函數轉換後組成 |
filter(func) |
返回一個新數據集,由通過func函數計算後返回值爲true的輸入元素組成 |
flatMap(func) |
類似於map,但是每一個輸入元素可以被映射爲0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
distinct([numTasks])) |
返回一個包括源數據集中所有不重複元素的新數據集 |
groupByKey([numTasks]) |
在一個(K,V)對的數據集上調用。返回一個(K。Seq[V])對的數據集註意:默認狀況下。僅僅有8個並行任務來作操做。但是你可以傳入一個可選的numTasks參數來改變它 |
reduceByKey(func[numTasks]) |
在一個(K。V)對的數據集上調用時。返回一個(K。V)對的數據集。使用指定的reduce函數,將一樣key的值聚合到一塊兒。 類似groupByKey,reduce任務個數是可以經過第二個可選參數來配置的 |
sortByKey([ascending[numTasks]) |
在一個(K,V)對的數據集上調用,K必須實現Ordered接口,返回一個依照Key進行排序的(K,V)對數據集。升序或降序由ascending布爾參數決定 |
join(otherDataset[numTasks]) |
在類型爲(K,V)和(K,W)類型的數據集上調用時,返回一個一樣key相應的所有元素對在一塊兒的(K, (V, W))數據集 |
動做(actions)
動做 |
含義 |
reduce(func) |
經過函數func(接受兩個參數,返回一個參數)彙集數據集中的所有元素。這個功能必須可交換且可關聯的,從而可以正確的被並行運行。 |
collect() |
在驅動程序中,以數組的形式。返回數據集的所有元素。這通常會在使用filter或者其餘操做並返回一個足夠小的數據子集後再使用會比較實用。 |
count() |
返回數據集的元素的個數。 |
first() |
返回數據集的第一個元素(類似於take(1)) |
take(n) |
返回一個由數據集的前n個元素組成的數組。 注意,這個操做眼下並非並行運行,而是由驅動程序計算所有的元素 |
saveAsTextFile(path) |
將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者不論什麼其餘hadoop支持的文件系統。對於每一個元素,Spark將會調用toString方法,將它轉換爲文件裏的文本行 |
countByKey() |
對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每一個key相應的元素個數 |
foreach(func) |
在數據集的每一個元素上。運行函數func進行更新。這通常用於邊緣效果,好比更新一個累加器,或者和外部存儲系統進行交互,好比HBase |
7. RDD依賴
•轉換操做,最基本的操做,是Spark生成DAG圖的對象,轉換操做並不立刻運行。在觸發行動操做後再提交給driver處理,生成DAG圖--> Stage --> Task --> Worker運行。按轉化操做在DAG圖中做用。可以分紅兩種:
•窄依賴
»輸入輸出一對一的操做。且結果RDD的分區結構不變,主要是map、flatMap;
»輸入輸出一對一,但結果RDD的分區結構發生了變化,如union等。
»從輸入中選擇部分元素的操做,如filter、distinct、subtract、sample。
•寬依賴。寬依賴會涉及shuffle類,在DAG圖解析時以此爲邊界產生Stage。如圖所看到的。
»對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey;
»對兩個RDD基於key進行join和重組,如join等。
Stage的劃分
在RDD的論文中有具體的介紹,簡單的說是以shuffle和result這兩種類型來劃分。在Spark中有兩類task。一類是shuffleMapTask,一類是resultTask,第一類task的輸出是shuffle所需數據,第二類task的輸出是result,stage的劃分也以此爲依據,shuffle以前的所有變換是一個stage。shuffle以後的操做是還有一個stage。比方rdd.parallize(1 to 10).foreach(println) 這個操做沒有shuffle,直接就輸出了,那麼僅僅有它的task是resultTask,stage也僅僅有一個;假設是rdd.map(x=> (x, 1)).reduceByKey(_ + _).foreach(println),這個job因爲有reduce。因此有一個shuffle過程。那麼reduceByKey以前的是一個stage,運行shuffleMapTask,輸出shuffle所需的數據,reduceByKey到最後是一個stage。直接就輸出結果了。
假設job中有屢次shuffle,那麼每一個shuffle以前都是一個stage。
8.Wordcount樣例
輸入文件樣例:由空格分隔的
aaabbbccc
ccc bbbddd
計算過程:讀入文件,把每行數據,按空格分紅單個的單詞。對每一個單詞記數
val ssc = newSparkContext().setAppName("WordCount")
val lines =ssc.textFile(args(1))//輸入
val words =
lines.flatMap(x=>x.split(" "))
words.cache()//緩存
valwordCounts =
words.map(x=>(x, 1) )
val red =wordCounts.reduceByKey( (a,b)=>{a + b})
red.saveAsTextFile(「/root/Desktop/out」) //行動
藍色的部分。生成相關的上下文,負責和Master,exutor通訊,請求資源,蒐集task運行的進度等
綠色的部分,僅僅是在定義相關的運算規則(也就是畫一張有向無環圖)。沒有運行實際的計算
當紅色的部分(action rdd)被調用的時候,纔會真正的向spark集羣去提交,Dag。。。依據以前代碼(也就是綠色的部分)生成rdd鏈。在依據分區算法生成partition,每一個partition相應一個Task,把這些task,交給Excutor去運行
9. 提交job
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hangzhou-jishuan-DDS0258.dratio.puppet:7077 \
--executor-memory 2G \
--total-executor-cores 3 \
/opt/spark-1.0.2-bin-hadoop1/lib/spark-examples-1.0.2-hadoop1.0.4.jar \
10
更具體的參數說明參見:http://blog.csdn.net/book_mmicky/article/details/25714545
10. 編程接口
Scala
Spark使用Scala開發,默認使用Scala做爲編程語言。
編寫Spark程序比編寫HadoopMapReduce程序要簡單的多,SparK提供了Spark-Shell,可以在Spark-Shell測試程序。寫SparK程序的通常步驟就是建立或使用(SparkContext)實例,使用SparkContext建立RDD。而後就是對RDD進行操做。參見:http://spark.apache.org/docs/latest/quick-start.html#tab_scala_3
如:
• val sc = new SparkContext(master, appName,[sparkHome], [jars])
• val textFile =sc.textFile("hdfs://.....")
• textFile.map(....).filter(.....).....
•
Java
• JavaSparkContext sc = newJavaSparkContext(...);
• JavaRDD lines =ctx.textFile("hdfs://...");
• JavaRDD words = lines.flatMap(
• new FlatMapFunction<String,String>() {
• public Iterable call(String s) {
• return Arrays.asList(s.split("")); } } );
•
Python
• from pyspark import SparkContext
• sc = SparkContext("local","Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
• words =sc.textFile("/usr/share/dict/words")
• words.filter(lambda w:w.startswith("spar")).take(5)
11. Spark運行架構
Sparkon YARN 運行過程(cluster模式)
1.用戶經過bin/spark-submit或bin/spark-class 向YARN提交Application
2.RM爲Application分配第一個container,並在指定節點的container上啓動SparkContext。
3.SparkContext向RM申請資源以運行Executor
4.RM分配Container給SparkContext,SparkContext和相關的NM通信,在得到的Container上啓動 StandaloneExecutorBackend,StandaloneExecutorBackend啓動後,開始向SparkContext註冊並申請 Task
5.SparkContext分配Task給StandaloneExecutorBackend運行
6.StandaloneExecutorBackend運行Task並向SparkContext彙報運行情況
7.Task運行完成。SparkContext歸還資源給NM,並註銷退出。
12.Spark SQL
Spark SQL是一個即席查詢系統,其前身是shark,只是代碼差點兒都重寫了,但利用了shark的最好部份內容。
SparkSQL可以經過SQL表達式、HiveQL或者Scala DSL在Spark上運行查詢。眼下Spark SQL仍是一個alpha版本號。
13.SparkStreaming
SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,可以對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP套接字)進行類似map、reduce、join、window等複雜操做,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。
SparkStreaming流式處理系統特色有:
• 將流式計算分解成一系列短小的(按秒)批處理做業
• 將失敗或者運行較慢的任務在其餘節點上並行運行
• 較強的容錯能力(checkpoint等)
• 使用和RDD同樣的語義
./bin/run-exampleorg.apache.spark.examples.streaming.NetworkWordCount localhost 9999
nc-lk 9999
14. 練習題
有一批ip。找出出現次數最多的前50個?
10.129.41.91
61.172.251.20
10.150.9.240
...
答案:
data.map(word=>(word,1)).reduceByKey(_+_).map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1)).take(50)
15.延伸
Lineage(血統)
Spark處理分佈式運算環境下的數據容錯性(節點實效/數據丟失)問題時採用血統關係(Lineage)方案。RDD數據集經過所謂的血統關係(Lineage)記住了它是怎樣從其餘RDD中演變過來的。相比其餘系統的細顆粒度的內存數據更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數據轉換(Transformation)操做(filter, map, join etc.)行爲。當這個RDD的部分分區數據丟失時,它可以經過Lineage獲取足夠的信息來又一次運算和恢復丟失的數據分區。
這樣的粗顆粒的數據模型。限制了Spark的運用場合。但同一時候相比細顆粒度的數據模型,也帶來了性能的提高。
RDD在Lineage依賴方面分爲兩種NarrowDependencies與WideDependencies用來解決數據容錯的高效性。NarrowDependencies是指父RDD的每一個分區最多被一個子RDD的分區所用,表現爲一個父RDD的分區相應於一個子RDD的分區或多個父RDD的分區相應於一個子RDD的分區,也就是說一個父RDD的一個分區不可能相應一個子RDD的多個分區。WideDependencies是指子RDD的分區依賴於父RDD的多個分區或所有分區,也就是說存在一個父RDD的一個分區相應一個子RDD的多個分區。
對與WideDependencies。這樣的計算的輸入和輸出在不一樣的節點上,lineage方法對與輸入節點完善,而輸出節點宕機時,經過又一次計算,這樣的狀況下,這樣的方法容錯是有效的,不然無效,因爲沒法重試。需要向上其祖先追溯看可否夠重試(這就是lineage。血統的意思),NarrowDependencies對於數據的重算開銷要遠小於WideDependencies的數據重算開銷。
容錯
在RDD計算。經過checkpint進行容錯,作checkpoint有兩種方式,一個是checkpointdata,一個是loggingthe updates。用戶可以控制採用哪一種方式來實現容錯,默認是loggingthe updates方式。經過記錄跟蹤所有生成RDD的轉換(transformations)也就是記錄每一個RDD的lineage(血統)來又一次計算生成丟失的分區數據。