Spark實戰

                                                                               1.Spark簡單介紹
html

什麼是Spark?git

  Spark是UC BerkeleyAmp實驗室開源的類Hadoop MapReduce的通用並行計算框架github

                                                          Spark    VS   MapReduce算法

MapReduce            數據庫

                                 ①.缺乏對迭代計算以及DAG運算的支持apache

                                 .Shuffle過程屢次排序和落地,MR之間的數據需要落Hdfs文件系統編程

Spark                        數組

                                 ①.提供了一套支持DAG圖的分佈式並行計算的編程框架,下降屢次計算之間中間結果寫到hdfs的開銷緩存

                                 .提供Cache機制來支持需要重複迭代計算或者屢次數據共享,下降數據讀取的IO開銷多線程

                                 .使用多線程池模型來下降task啓動開稍。shuffle過程當中避免沒必要要的sort操做以及下降磁盤IO操做

                                 .普遍的數據集操做類型(map,groupby,count,filter)

                                 ⑤.Spark經過提供豐富的Scala, Java。PythonAPI及交互式Shell來提升可用性

                                 ⑥.RDD之間維護了血統關係,一旦RDDfail掉了。能經過父RDD本身主動重建,保證了容錯性。 採用容錯的、高可伸縮性的akka做爲通信框架

 



                                                 2.Spark生態系統


                                 



                                                                      3.Scala集合簡單介紹


vallist2 = List(1,2,3,4,5)

list2.map{x=>x +8}     //{9,10,11,12,13}

list2.filter{x=>x > 3}      //{4,5}

list2.reduce(_ + _)

不少其餘scala學習網址:http://twitter.github.io/scala_school/zh_cn/collections.html


                                                                      4.spark的關鍵組件


Master

Worker

SparkContext(client)



                                                                      5.核心概念:彈性分佈式數據集

  Spark環繞的概念是彈性分佈式數據集(RDD),這是一個有容錯機制並可以被並行操做的元素集合。

RDD的特色:

失敗本身主動重建。對於丟失部分數據分區僅僅需依據它的lineage(見文章最後介紹)就可又一次計算出來,而不需要作特定的Checkpoint

可以控制存儲級別(內存、磁盤等)來進行重用。

默認是存儲於內存,但當內存不足時。RDD會spill到disk

必須是可序列化的。

眼下RDD有兩種建立方式:並行集合(ParallelizedCollections):接收一個已經存在的Scala集合,而後進行各類並行計算。

Hadoop數據集(HadoopDatasets):在一個文件的每條記錄上運行函數。僅僅要文件系統是HDFS,或者hadoop支持的隨意存儲系統就能夠。這兩種類型的RDD都可以經過一樣的方式進行操做。

1.並行集合(Parallelized Collections)

並行集合是經過調用SparkContext的parallelize方法。在一個已經存在的Scala集合上建立的(一個Seq對象)。

集合的對象將會被拷貝,建立出一個可以被並行操做的分佈式數據集。

好比。如下的輸出。演示了怎樣從一個數組建立一個並行集合:


scala> val data = Array(1, 2, 3, 4, 5)

scala> val distData =sc.parallelize(data)

一旦分佈式數據集(distData)被建立好,它們將可以被並行操做。好比,咱們可以調用distData.reduce(_+_ )來將數組的元素相加

2.Hadoop數據集(Hadoop Datasets)

Spark可以從存儲在HDFS,或者Hadoop支持的其餘文件系統(包括本地文件,HBase等等)上的文件建立分佈式數據集。

Text file的RDDs可以經過SparkContext’stextFile的方式建立,

scala> val distFile =sc.textFile("data.txt")


並行集合的一個重要參數是slices,表示數據集切分的份數。Spark將會在集羣上爲每一份數據起一個任務。典型地。你可以在集羣的每一個CPU上分佈2-4個slices.通常來講,Spark會嘗試依據集羣的情況,來本身主動設定slices的數目。

然而,你也可以經過傳遞給parallelize的第二個參數來進行手動設置。(好比:sc.parallelize(data,10)).

textFile方法也可以經過輸入一個可選的第二參數,來控制文件的分片數目。

默認狀況下,Spark爲每一塊文件建立一個分片(HDFS默認的塊大小爲64MB),但是你也可以經過傳入一個更大的值,來指定一個更高的片值。注意,你不能指定一個比塊數更小的片值(和Map數不能小於Block數同樣,但是可以比它多)


                                                                        6.RDD的操做


RDD支持兩種操做:轉換(transformation)從現有的數據集建立一個新的數據集;而動做(actions)在數據集上運行計算後。返回一個值給驅動程序。好比,map就是一種轉換。它將數據集每一個元素都傳遞給函數,並返回一個新的分佈數據集表示結果。

還有一方面。reduce是一種動做。經過一些函數將所有的元素疊加起來。並將終於結果返回給Driver程序。

                                                                                                                       轉換(transformation)

 轉換

含義

map(func)

返回一個新分佈式數據集,由每一個輸入元素通過func函數轉換後組成

filter(func)

返回一個新數據集,由通過func函數計算後返回值爲true的輸入元素組成

flatMap(func)

類似於map,但是每一個輸入元素可以被映射爲0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)

distinct([numTasks]))

返回一個包括源數據集中所有不重複元素的新數據集

groupByKey([numTasks])

在一個(K,V)對的數據集上調用。返回一個(KSeq[V])對的數據集註意:默認狀況下。僅僅有8個並行任務來作操做。但是你可以傳入一個可選的numTasks參數來改變它

reduceByKey(func[numTasks])

在一個(K。V)對的數據集上調用時。返回一個(K。V)對的數據集。使用指定的reduce函數,將一樣key的值聚合到一塊兒。

類似groupByKey,reduce任務個數是可以經過第二個可選參數來配置的

sortByKey([ascending[numTasks])

在一個(K,V)對的數據集上調用,K必須實現Ordered接口,返回一個依照Key進行排序的(K,V)對數據集。升序或降序由ascending布爾參數決定

join(otherDataset[numTasks])

在類型爲(K,V)和(K,W)類型的數據集上調用時,返回一個一樣key相應的所有元素對在一塊兒的(K, (V, W))數據集


動做(actions)

 動做

含義

reduce(func)

經過函數func(接受兩個參數,返回一個參數)彙集數據集中的所有元素。這個功能必須可交換且可關聯的,從而可以正確的被並行運行。

collect()

在驅動程序中,以數組的形式。返回數據集的所有元素。這通常會在使用filter或者其餘操做並返回一個足夠小的數據子集後再使用會比較實用。

count()

返回數據集的元素的個數。

first()

返回數據集的第一個元素(類似於take(1))

take(n)

返回一個由數據集的前n個元素組成的數組。

注意,這個操做眼下並非並行運行,而是由驅動程序計算所有的元素

saveAsTextFile(path)

將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者不論什麼其餘hadoop支持的文件系統。對於每一個元素,Spark將會調用toString方法,將它轉換爲文件裏的文本行

countByKey()

對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每一個key相應的元素個數

foreach(func)

在數據集的每一個元素上。運行函數func進行更新。這通常用於邊緣效果,好比更新一個累加器,或者和外部存儲系統進行交互,好比HBase


                                                                                                                                                                                                                                             

                                                                        7. RDD依賴


轉換操做,最基本的操做,是Spark生成DAG圖的對象,轉換操做並不立刻運行。在觸發行動操做後再提交給driver處理,生成DAG圖--> Stage --> Task  --> Worker運行。按轉化操做在DAG圖中做用。可以分紅兩種:

窄依賴

»輸入輸出一對一的操做。且結果RDD的分區結構不變,主要是map、flatMap;

»輸入輸出一對一,但結果RDD的分區結構發生了變化,如union等。

»從輸入中選擇部分元素的操做,如filter、distinct、subtract、sample。


寬依賴。寬依賴會涉及shuffle類,在DAG圖解析時以此爲邊界產生Stage。如圖所看到的。

»對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey;

»對兩個RDD基於key進行join和重組,如join等。



Stage的劃分

在RDD的論文中有具體的介紹,簡單的說是以shuffle和result這兩種類型來劃分。在Spark中有兩類task。一類是shuffleMapTask,一類是resultTask,第一類task的輸出是shuffle所需數據,第二類task的輸出是result,stage的劃分也以此爲依據,shuffle以前的所有變換是一個stage。shuffle以後的操做是還有一個stage。比方rdd.parallize(1 to 10).foreach(println) 這個操做沒有shuffle,直接就輸出了,那麼僅僅有它的task是resultTask,stage也僅僅有一個;假設是rdd.map(x=> (x, 1)).reduceByKey(_ + _).foreach(println),這個job因爲有reduce。因此有一個shuffle過程。那麼reduceByKey以前的是一個stage,運行shuffleMapTask,輸出shuffle所需的數據,reduceByKey到最後是一個stage。直接就輸出結果了。

假設job中有屢次shuffle,那麼每一個shuffle以前都是一個stage。



                                                                        8.Wordcount樣例


輸入文件樣例:由空格分隔的

aaabbbccc

ccc bbbddd

計算過程:讀入文件,把每行數據,按空格分紅單個的單詞。對每一個單詞記數

    val  ssc = newSparkContext().setAppName("WordCount")

    val lines =ssc.textFile(args(1))//輸入

     val words =

     lines.flatMap(x=>x.split(" "))

     words.cache()//緩存

     valwordCounts =

     words.map(x=>(x, 1) )

     val red =wordCounts.reduceByKey( (a,b)=>{a + b})

    red.saveAsTextFile(「/root/Desktop/out」) //行動


藍色的部分。生成相關的上下文,負責和Masterexutor通訊,請求資源,蒐集task運行的進度等

綠色的部分,僅僅是在定義相關的運算規則(也就是畫一張有向無環圖)。沒有運行實際的計算

當紅色的部分(action rdd)被調用的時候,纔會真正的向spark集羣去提交,Dag。。。依據以前代碼(也就是綠色的部分)生成rdd鏈。在依據分區算法生成partition,每一個partition相應一個Task,把這些task,交給Excutor去運行



                                                                      9. 提交job


./bin/spark-submit \

 --class org.apache.spark.examples.SparkPi \

 --master spark://hangzhou-jishuan-DDS0258.dratio.puppet:7077 \

 --executor-memory 2G \

 --total-executor-cores 3 \

 /opt/spark-1.0.2-bin-hadoop1/lib/spark-examples-1.0.2-hadoop1.0.4.jar \

 10

更具體的參數說明參見:http://blog.csdn.net/book_mmicky/article/details/25714545


                                                                      10.  編程接口


Scala

Spark使用Scala開發,默認使用Scala做爲編程語言。

編寫Spark程序比編寫HadoopMapReduce程序要簡單的多,SparK提供了Spark-Shell,可以在Spark-Shell測試程序。寫SparK程序的通常步驟就是建立或使用(SparkContext)實例,使用SparkContext建立RDD。而後就是對RDD進行操做。參見:http://spark.apache.org/docs/latest/quick-start.html#tab_scala_3
如:

    val sc = new SparkContext(master, appName,[sparkHome], [jars])

    val textFile =sc.textFile("hdfs://.....")

    textFile.map(....).filter(.....).....

Java

    JavaSparkContext sc = newJavaSparkContext(...); 

    JavaRDD lines =ctx.textFile("hdfs://...");

    JavaRDD words = lines.flatMap(

      new FlatMapFunction<String,String>() {

         public Iterable call(String s) {

            return Arrays.asList(s.split("")); } } );

Python

    from pyspark import SparkContext

    sc = SparkContext("local","Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])

    words =sc.textFile("/usr/share/dict/words")

    words.filter(lambda w:w.startswith("spar")).take(5)


                                                                        11. Spark運行架構

 Sparkon YARN 運行過程(cluster模式)

1.用戶經過bin/spark-submit或bin/spark-class 向YARN提交Application

2.RM爲Application分配第一個container,並在指定節點的container上啓動SparkContext。

3.SparkContext向RM申請資源以運行Executor

4.RM分配Container給SparkContext,SparkContext和相關的NM通信,在得到的Container上啓動 StandaloneExecutorBackend,StandaloneExecutorBackend啓動後,開始向SparkContext註冊並申請  Task

5.SparkContext分配Task給StandaloneExecutorBackend運行

6.StandaloneExecutorBackend運行Task並向SparkContext彙報運行情況

7.Task運行完成。SparkContext歸還資源給NM,並註銷退出。



                                                                        12.Spark SQL

Spark SQL是一個即席查詢系統,其前身是shark,只是代碼差點兒都重寫了,但利用了shark的最好部份內容。

SparkSQL可以經過SQL表達式、HiveQL或者Scala DSL在Spark上運行查詢。眼下Spark SQL仍是一個alpha版本號。


                                                                         13.SparkStreaming

     SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,可以對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP套接字)進行類似map、reduce、join、window等複雜操做,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。



SparkStreaming流式處理系統特色有:

   將流式計算分解成一系列短小的(按秒)批處理做業

   將失敗或者運行較慢的任務在其餘節點上並行運行

   較強的容錯能力(checkpoint等)

   使用和RDD同樣的語義

                                                        

./bin/run-exampleorg.apache.spark.examples.streaming.NetworkWordCount localhost 9999

nc-lk 9999



                                                                         14. 練習題

有一批ip。找出出現次數最多的前50個?

10.129.41.91

61.172.251.20

10.150.9.240

...

答案:

data.map(word=>(word,1)).reduceByKey(_+_).map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1)).take(50)



                                                                       15.延伸

Lineage(血統)

Spark處理分佈式運算環境下的數據容錯性(節點實效/數據丟失)問題時採用血統關係(Lineage)方案。RDD數據集經過所謂的血統關係(Lineage)記住了它是怎樣從其餘RDD中演變過來的。相比其餘系統的細顆粒度的內存數據更新級別的備份或者LOG機制RDDLineage記錄的是粗顆粒度的特定數據轉換(Transformation)操做(filter, map, join etc.)行爲。當這個RDD的部分分區數據丟失時,它可以經過Lineage獲取足夠的信息來又一次運算和恢復丟失的數據分區。

這樣的粗顆粒的數據模型。限制了Spark的運用場合。但同一時候相比細顆粒度的數據模型,也帶來了性能的提高。

RDDLineage依賴方面分爲兩種NarrowDependenciesWideDependencies用來解決數據容錯的高效性。NarrowDependencies是指父RDD的每一個分區最多被一個子RDD的分區所用,表現爲一個父RDD的分區相應於一個子RDD的分區或多個父RDD的分區相應於一個子RDD的分區,也就是說一個父RDD的一個分區不可能相應一個子RDD的多個分區。WideDependencies是指子RDD的分區依賴於父RDD的多個分區或所有分區,也就是說存在一個父RDD的一個分區相應一個子RDD的多個分區。

對與WideDependencies。這樣的計算的輸入和輸出在不一樣的節點上,lineage方法對與輸入節點完善,而輸出節點宕機時,經過又一次計算,這樣的狀況下,這樣的方法容錯是有效的,不然無效,因爲沒法重試。需要向上其祖先追溯看可否夠重試(這就是lineage。血統的意思),NarrowDependencies對於數據的重算開銷要遠小於WideDependencies的數據重算開銷。

容錯

RDD計算。經過checkpint進行容錯,作checkpoint有兩種方式,一個是checkpointdata,一個是loggingthe updates。用戶可以控制採用哪一種方式來實現容錯,默認是loggingthe updates方式。經過記錄跟蹤所有生成RDD的轉換(transformations)也就是記錄每一個RDDlineage(血統)來又一次計算生成丟失的分區數據。

相關文章
相關標籤/搜索