【轉】Spark:一個高效的分佈式計算系統

原文地址:http://tech.uc.cn/?p=2116

概述

什麼是Spark

  • Spark是UC Berkeley AMP lab所開源的類Hadoop MapReduce的通用的並行計算框架,Spark基於map reduce算法實現的分佈式計算,擁有Hadoop MapReduce所具備的優勢;但不一樣於MapReduce的是Job中間輸出和結果能夠保存在內存中,從而再也不須要讀寫HDFS,所以Spark能更好地適用於數據挖掘與機器學習等須要迭代的map reduce的算法。其架構以下圖所示:

spark-framwork

Spark與Hadoop的對比

  • Spark的中間數據放到內存中,對於迭代運算效率更高。
    • Spark更適合於迭代運算比較多的ML和DM運算。由於在Spark裏面,有RDD的抽象概念。
  • Spark比Hadoop更通用。
    • Spark提供的數據集操做類型有不少種,不像Hadoop只提供了Map和Reduce兩種操做。好比mapfilterflatMapsamplegroupByKeyreduceByKeyunionjoincogroup,mapValuessort,partionBy等多種操做類型,Spark把這些操做稱爲Transformations。同時還提供Countcollectreducelookupsave等多種actions操做。
    • 這些多種多樣的數據集操做類型,給給開發上層應用的用戶提供了方便。各個處理節點之間的通訊模型再也不像Hadoop那樣就是惟一的Data Shuffle一種模式。用戶能夠命名,物化,控制中間結果的存儲、分區等。能夠說編程模型比Hadoop更靈活。
    • 不過因爲RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對於那種增量修改的應用模型不適合。
  • 容錯性。
    • 在分佈式數據集計算時經過checkpoint來實現容錯,而checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶能夠控制採用哪一種方式來實現容錯。
  • 可用性。
    • Spark經過提供豐富的Scala, Java,Python API及交互式Shell來提升可用性。

 

Spark與Hadoop的結合

  • Spark能夠直接對HDFS進行數據的讀寫,一樣支持Spark on YARN。Spark能夠與MapReduce運行於同集羣中,共享存儲資源與計算,數據倉庫Shark實現上借用Hive,幾乎與Hive徹底兼容。

Spark的適用場景

  • Spark是基於內存的迭代計算框架,適用於須要屢次操做特定數據集的應用場合。須要反覆操做的次數越多,所需讀取的數據量越大,受益越大,數據量小可是計算密集度較大的場合,受益就相對較小
  • 因爲RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對於那種增量修改的應用模型不適合。
  • 總的來講Spark的適用面比較普遍且比較通用。

運行模式

  • 本地模式
  • Standalone模式
  • Mesoes模式
  • yarn模式

Spark生態系統

  • Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎上提供和Hive同樣的H iveQL命令接口,爲了最大程度的保持和Hive的兼容性,Shark使用了Hive的API來實現query Parsing和 Logic Plan generation,最後的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。經過配置Shark參數,Shark能夠自動在內存中緩存特定的RDD,實現數據重用,進而加快特定數據集的檢索。同時,Shark經過UDF用戶自定義函數實現特定的數據分析學習算法,使得SQL數據查詢和運算分析能結合在一塊兒,最大化RDD的重複使用。
  • Spark streaming: 構建在Spark上處理Stream數據的框架,基本的原理是將Stream數據分紅小的時間片段(幾秒),以相似batch批量處理的方式來處理這小部分數據。Spark Streaming構建在Spark上,一方面是由於Spark的低延遲執行引擎(100ms+)能夠用於實時計算,另外一方面相比基於Record的其它處理框架(如Storm),RDD數據集更容易作高效的容錯處理。此外小批量處理的方式使得它能夠同時兼容批量和實時數據處理的邏輯和算法。方便了一些須要歷史數據和實時數據聯合分析的特定應用場合。
  • Bagel: Pregel on Spark,能夠用Spark進行圖計算,這是個很是有用的小項目。Bagel自帶了一個例子,實現了Google的PageRank算法。

在業界的使用

  • Spark項目在2009年啓動,2010年開源, 如今使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Spark的python克隆版Dpark。

Spark核心概念

Resilient Distributed Dataset (RDD)彈性分佈數據集

  • RDD是Spark的最基本抽象,是對分佈式內存的抽象使用,實現了以操做本地集合的方式來操做分佈式數據集的抽象實現。RDD是Spark最核心的東西,它表示已被分區,不可變的並可以被並行操做的數據集合,不一樣的數據集格式對應不一樣的RDD實現。RDD必須是可序列化的。RDD能夠cache到內存中,每次對RDD數據集的操做以後的結果,均可以存放到內存中,下一個操做能夠直接從內存中輸入,省去了MapReduce大量的磁盤IO操做。這對於迭代運算比較常見的機器學習算法, 交互式數據挖掘來講,效率提高比較大。
  • RDD的特色:java

    1. 它是在集羣節點上的不可變的、已分區的集合對象。
    2. 經過並行轉換的方式來建立如(map, filter, join, etc)。
    3. 失敗自動重建。
    4. 能夠控制存儲級別(內存、磁盤等)來進行重用。
    5. 必須是可序列化的。
    6. 是靜態類型的。
  • RDD的好處python

    1. RDD只能從持久存儲或經過Transformations操做產生,相比於分佈式共享內存(DSM)能夠更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可從新計算出來,而不須要作特定的Checkpoint。
    2. RDD的不變性,能夠實現類Hadoop MapReduce的推測式執行。
    3. RDD的數據分區特性,能夠經過數據的本地性來提升性能,這與Hadoop MapReduce是同樣的。
    4. RDD都是可序列化的,在內存不足時可自動降級爲磁盤存儲,把RDD存儲於磁盤上,這時性能會有大的降低但不會差於如今的MapReduce。
  • RDD的存儲與分區git

    1. 用戶能夠選擇不一樣的存儲級別存儲RDD以便重用。
    2. 當前RDD默認是存儲於內存,但當內存不足時,RDD會spill到disk。
    3. RDD在須要進行分區把數據分佈於集羣中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。
  • RDD的內部表示
    在RDD的內部實現中每一個RDD均可以使用5個方面的特性來表示:github

    1. 分區列表(數據塊列表)
    2. 計算每一個分片的函數(根據父RDD計算出此RDD)
    3. 對父RDD的依賴列表
    4. 對key-value RDD的Partitioner【可選】
    5. 每一個數據分片的預約義地址列表(如HDFS上的數據塊的地址)【可選】
  • RDD的存儲級別
    RDD根據useDisk、useMemory、deserialized、replication四個參數的組合提供了11種存儲級別:web

  •       val NONE=newStorageLevel(false,false,false)
          val DISK_ONLY=newStorageLevel(true,false,false)
          val DISK_ONLY_2=newStorageLevel(true,false,false,2)
          val MEMORY_ONLY=newStorageLevel(false,true,true)
          val MEMORY_ONLY_2=newStorageLevel(false,true,true,2)
          val MEMORY_ONLY_SER=newStorageLevel(false,true,false)
          val MEMORY_ONLY_SER_2=newStorageLevel(false,true,false,2)
          val MEMORY_AND_DISK=newStorageLevel(true,true,true)
          val MEMORY_AND_DISK_2=newStorageLevel(true,true,true,2)
          val MEMORY_AND_DISK_SER=newStorageLevel(true,true,false)
          val MEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,2) 

     

  • RDD定義了各類操做,不一樣類型的數據由不一樣的RDD類抽象表示,不一樣的操做也由RDD進行抽實現。算法

RDD的生成

  • RDD有兩種建立方式:
    一、從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入(例如HDFS)建立。
    二、從父RDD轉換獲得新RDD。
  • 下面來看一從Hadoop文件系統生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼以下:shell

  •      // SparkContext根據文件/目錄及可選的分片數建立RDD, 這裏咱們能夠看到Spark與Hadoop MapReduce很像 
         // 須要InputFormat, Key、Value的類型,其實Spark使用的Hadoop的InputFormat, Writable類型。
         def textFile(path:String,minSplits:Int=defaultMinSplits):RDD[String]={
             hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],
             classOf[Text],minSplits).map(pair=>pair._2.toString)}
      
         // 根據Hadoop配置,及InputFormat等建立HadoopRDD  
         newHadoopRDD(this,conf,inputFormatClass,keyClass,valueClass,minSplits)

     

  • 對RDD進行計算時,RDD從HDFS讀取數據時與Hadoop MapReduce幾乎同樣的:數據庫

  • // 根據hadoop配置和分片從InputFormat中獲取RecordReader進行數據的讀取。
        reader=fmt.getRecordReader(split.inputSplit.value,conf,Reporter.NULL)
        val key:K=reader.createKey()
        val value:V=reader.createValue()
        //使用Hadoop MapReduce的RecordReader讀取數據,每一個Key、Value對以元組返回。
        override def getNext()={
        try{
          finished=!reader.next(key,value)
        }catch{
          caseeof:EOFException=>
            finished=true
        }
          (key,value)
        }

     

RDD的轉換與操做

  • val sc=newSparkContext(master,"Example",System.getenv("SPARK_HOME"),
              Seq(System.getenv("SPARK_TEST_JAR")))
       
          val rdd_A=sc.textFile(hdfs://.....)
          val rdd_B=rdd_A.flatMap((line=>line.split("\\s+"))).map(word=>(word,1))
       
          val rdd_C=sc.textFile(hdfs://.....)
          val rdd_D=rdd_C.map(line=>(line.substring(10),1))
          val rdd_E=rdd_D.reduceByKey((a,b)=>a+b)
      
          val rdd_F=rdd_B.jion(rdd_E)
      
          rdd_F.saveAsSequenceFile(hdfs://....)

     

    對於RDD能夠有兩種計算方式:轉換(返回值仍是一個RDD)與操做(返回值不是一個RDD)。
  • 轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操做是Lazy的,也就是說從一個RDD轉換生成另外一個RDD的操做不是立刻執行,Spark在遇到Transformations操做時只會記錄須要這樣的操做,並不會去執行,須要等到有Actions操做的時候纔會真正啓動計算過程進行計算。
  • 操做(Actions) (如:count, collect, save等),Actions操做會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啓動計算的動因。
  • 下面使用一個例子來示例說明Transformations與Actions在Spark的使用。編程

  •  

     

SparkTA11

Lineage(血統)

  • 利用內存加快數據加載,在衆多的其它的In-Memory類數據庫或Cache類系統中也有實現,Spark的主要區別在於它處理分佈式運算環境下的數據容錯性(節點實效/數據丟失)問題時採用的方案。爲了保證RDD中數據的魯棒性,RDD數據集經過所謂的血統關係(Lineage)記住了它是如何從其它RDD中演變過來的。相比其它系統的細顆粒度的內存數據更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數據轉換(Transformation)操做(filter, map, join etc.)行爲。當這個RDD的部分分區數據丟失時,它能夠經過Lineage獲取足夠的信息來從新運算和恢復丟失的數據分區。這種粗顆粒的數據模型,限制了Spark的運用場合,但同時相比細顆粒度的數據模型,也帶來了性能的提高。
  • RDD在Lineage依賴方面分爲兩種Narrow Dependencies與Wide Dependencies用來解決數據容錯的高效性。Narrow Dependencies是指父RDD的每個分區最多被一個子RDD的分區所用,表現爲一個父RDD的分區對應於一個子RDD的分區或多個父RDD的分區對應於一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。Wide Dependencies是指子RDD的分區依賴於父RDD的多個分區或全部分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。對與Wide Dependencies,這種計算的輸入和輸出在不一樣的節點上,lineage方法對與輸入節點無缺,而輸出節點宕機時,經過從新計算,這種狀況下,這種方法容錯是有效的,不然無效,由於沒法重試,須要向上其祖先追溯看是否能夠重試(這就是lineage,血統的意思),Narrow Dependencies對於數據的重算開銷要遠小於Wide Dependencies的數據重算開銷。

容錯

  • 在RDD計算,經過checkpint進行容錯,作checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶能夠控制採用哪一種方式來實現容錯,默認是logging the updates方式,經過記錄跟蹤全部生成RDD的轉換(transformations)也就是記錄每一個RDD的lineage(血統)來從新計算生成丟失的分區數據。

資源管理與做業調度

  • Spark對於資源管理與做業調度可使用Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現。 Spark on Yarn在Spark0.6時引用,但真正可用是在如今的branch-0.8版本。Spark on Yarn遵循YARN的官方規範實現,得益於Spark天生支持多種Scheduler和Executor的良好設計,對YARN的支持也就很是容易,Spark on Yarn的大體框架圖。緩存

 Spark架構圖

  • 讓Spark運行於YARN上與Hadoop共用集羣資源能夠提升資源利用率。


編程接口

  • Spark經過與編程語言集成的方式暴露RDD的操做,相似於DryadLINQ和FlumeJava,每一個數據集都表示爲RDD對象,對數據集的操做就表示成對RDD對象的操做。Spark主要的編程語言是Scala,選擇Scala是由於它的簡潔性(Scala能夠很方便在交互式下使用)和性能(JVM上的靜態強類型語言)。
  • Spark和Hadoop MapReduce相似,由Master(相似於MapReduce的Jobtracker)和Workers(Spark的Slave工做節點)組成。用戶編寫的Spark程序被稱爲Driver程序,Dirver程序會鏈接master並定義了對各RDD的轉換與操做,而對RDD的轉換與操做經過Scala閉包(字面量函數)來表示,Scala使用Java對象來表示閉包且都是可序列化的,以此把對RDD的閉包操做發送到各Workers節點。 Workers存儲着數據分塊和享有集羣內存,是運行在工做節點上的守護進程,當它收到對RDD的操做時,根據數據分片信息進行本地化數據操做,生成新的數據分片、返回結果或把RDD寫入存儲系統。 

runtime

Scala

Spark使用Scala開發,默認使用Scala做爲編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,能夠在Spark-Shell測試程序。寫SparK程序的通常步驟就是建立或使用(SparkContext)實例,使用SparkContext建立RDD,而後就是對RDD進行操做。如:

1     val sc=newSparkContext(master,appName,[sparkHome],[jars])
2     val textFile=sc.textFile("hdfs://.....")
3     textFile.map(....).filter(.....).....

 

Java

  • Spark支持Java編程,但對於使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是同樣的,由於都是JVM上的語言,Scala與Java能夠互操做,Java編程接口其實就是對Scala的封裝。如:

  •    JavaSparkContext sc=newJavaSparkContext(...);  
          JavaRDD lines=ctx.textFile("hdfs://...");
          JavaRDD words=lines.flatMap(
            newFlatMapFunction<String,String>(){
               publicIterable call(Strings){
                  returnArrays.asList(s.split(" "));
               }
             }
          );

     

Python

  • 如今Spark也提供了Python編程接口,Spark使用py4j來實現python與java的互操做,從而實現使用python編寫Spark程序。Spark也一樣提供了pyspark,一個Spark的python shell,能夠以交互式的方式使用Python編寫Spark程序。 如:

1 from pyspark import SparkContext
2     sc=SparkContext("local","Job Name",pyFiles=['MyFile.py','lib.zip','app.egg'])
3     words=sc.textFile("/usr/share/dict/words")
4     words.filter(lambdaw:w.startswith("spar")).take(5)

 


使用示例

Standalone模式

  • 爲方便Spark的推廣使用,Spark提供了Standalone模式,Spark一開始就設計運行於Apache Mesos資源管理框架上,這是很是好的設計,可是卻帶了部署測試的複雜性。爲了讓Spark能更方便的部署和嘗試,Spark所以提供了Standalone運行模式,它由一個Spark Master和多個Spark worker組成,與Hadoop MapReduce1很類似,就連集羣啓動方式都幾乎是同樣。
  • 以Standalone模式運行Spark集羣

    • 下載Scala2.9.3,並配置SCALA_HOME
    • 下載Spark代碼(可使用源碼編譯也能夠下載編譯好的版本)這裏下載 編譯好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
    • 解壓spark-0.7.3-prebuilt-cdh4.tgz安裝包
    • 修改配置(conf/*) slaves: 配置工做節點的主機名 spark-env.sh:配置環境變量。

       

      1 SCALA_HOME=/home/spark/scala-2.9.3
      2 JAVA_HOME=/home/spark/jdk1.6.0_45
      3 SPARK_MASTER_IP=spark1            
      4 SPARK_MASTER_PORT=30111
      5 SPARK_MASTER_WEBUI_PORT=30118
      6 SPARK_WORKER_CORES=2SPARK_WORKER_MEMORY=4g
      7 SPARK_WORKER_PORT=30333
      8 SPARK_WORKER_WEBUI_PORT=30119
      9 SPARK_WORKER_INSTANCES=1

       

    • 把Hadoop配置copy到conf目錄下

    • 在master主機上對其它機器作ssh無密碼登陸

    • 把配置好的Spark程序使用scp copy到其它機器

    • 在master啓動集羣:$SPARK_HOME/start-all.sh 

yarn模式

  • Spark-shell如今還不支持Yarn模式,使用Yarn模式運行,須要把Spark程序所有打包成一個jar包提交到Yarn上運行。目錄只有branch-0.8版本才真正支持Yarn。
  • 以Yarn模式運行Spark

    • 下載Spark代碼:git clonegit://github.com/mesos/spark 

    • 切換到branch-0.8

      cd  spark
      git  checkout - b yarn -- track  origin / yarn
    • 使用sbt編譯Spark並

      $SPARK_HOME/sbt/sbt
      >package
      >assembly
    • 把Hadoop yarn配置copy到conf目錄下

    • 運行測試

SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar\
./run spark.deploy.yarn.Client--jar examples/target/scala-2.9.3/\
--classspark.examples.SparkPi--args yarn-standalone

 

使用Spark-shell

  • Spark-shell使用很簡單,當Spark以Standalon模式運行後,使用$SPARK_HOME/spark-shell進入shell便可,在Spark-shell中SparkContext已經建立好了,實例名爲sc能夠直接使用,還有一個須要注意的是,在Standalone模式下,Spark默認使用的調度器的FIFO調度器而不是公平調度,而Spark-shell做爲一個Spark程序一直運行在Spark上,其它的Spark程序就只能排隊等待,也就是說同一時間只能有一個Spark-shell在運行。
  • 在Spark-shell上寫程序很是簡單,就像在Scala Shell上寫程序同樣。

    1     scala>val textFile=sc.textFile("hdfs://hadoop1:2323/user/data")
    2     textFile:spark.RDD[String]=spark.MappedRDD@2ee9b6e3
    3  
    4     scala>textFile.count()// Number of items in this RDD
    5     res0:Long=21374
    6  
    7     scala>textFile.first()// First item in this RDD
    8     res1:String=# Spark

編寫Driver程序

    • 在Spark中Spark程序稱爲Driver程序,編寫Driver程序很簡單幾乎與在Spark-shell上寫程序是同樣的,不一樣的地方就是SparkContext須要本身建立。如WorkCount程序以下:

       

       1 import spark.SparkContext
       2 import SparkContext._
       3  
       4 objectWordCount{
       5   def main(args:Array[String]){
       6     if(args.length==0){
       7       println("usage is org.test.WordCount <master>")
       8     }
       9     println("the args: ")
      10     args.foreach(println)
      11  
      12     val hdfsPath="hdfs://hadoop1:8020"
      13  
      14     // create the SparkContext, args(0)由yarn傳入appMaster地址
      15     val sc=newSparkContext(args(0),"WrodCount",
      16     System.getenv("SPARK_HOME"),Seq(System.getenv("SPARK_TEST_JAR")))
      17  
      18     val textFile=sc.textFile(hdfsPath+args(1))
      19  
      20     val result=textFile.flatMap(line=>line.split("\\s+"))
      21         .map(word=>(word,1)).reduceByKey(_+_)
      22  
      23     result.saveAsTextFile(hdfsPath+args(2))
      24   }
      25 }
相關文章
相關標籤/搜索