5.spark core之RDD編程

  spark提供了對數據的核心抽象——彈性分佈式數據集(Resilient Distributed Dataset,簡稱RDD)。RDD是一個分佈式的數據集合,數據能夠跨越集羣中的多個機器節點,被分區並行執行。
  在spark中,對數據的全部操做不外乎建立RDD、轉化已有RDD及調用RDD操做進行求值。spark會自動地將RDD中的數據分發到集羣中並行執行。java

五大特性

  • a list of partitions
      RDD是一個由多個partition(某個節點裏的某一片連續的數據)組成的的list;將數據加載爲RDD時,通常會遵循數據的本地性(通常一個hdfs裏的block會加載爲一個partition)。
  • a function for computing each split
      RDD的每一個partition中都會有function,即函數應用,其做用是實現RDD之間partition的轉換。
  • a list of dependencies on other RDDs
      RDD會記錄它的依賴,爲了容錯(重算,cache,checkpoint),即內存中的RDD操做出錯或丟失時會進行重算。
  • Optionally,a Partitioner for Key-value RDDs
      可選項,若是RDD裏面存的數據是key-value形式,則能夠傳遞一個自定義的Partitioner進行從新分區,例如自定義的Partitioner是基於key進行分區,那則會將不一樣RDD裏面的相同key的數據放到同一個partition裏面。
  • Optionally, a list of preferred locations to compute each split on
      可選項,最優的位置去計算每一個分片,即數據的本地性。

    建立RDD

      spark提供了兩種建立RDD的方式:讀取外部數據源、將驅動器程序中的集合進行並行化。python

    並行化集合

      使用sparkContext的parallelize()方法將集合並行化。
      parallelize()方法第二個參數可指定分區數。spark會爲每一個分區建立一個task任務,一般每一個cpu須要2-4個分區。spark會自動地根據集羣大小設置分區數,也支持經過parallelize()方法的第二個參數手動指定。apache

    scala

    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)

    java

    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);

    python

    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data)

      注:除了開發和測試外,這種方式用得很少。這種方式須要把整個數據集先放到一臺機器的內存中。編程

    讀取外部數據源

      spark可接入多種hadoop支持的數據源來建立分佈式數據集。包括:本地文件系統、HDFS、Cassandra、HBase、Amazon S3等。
      spark支持多種存儲格式,包括textFiles、SequenceFiles及其餘hadoop存儲格式。緩存

    scala

    scala> val distFile = sc.textFile("data.txt")
    distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

    java

    JavaRDD<String> distFile = sc.textFile("data.txt");

    python

    >>> distFile = sc.textFile("data.txt")

RDD操做

  RDD支持兩種操做:轉化操做和行動操做。
算子.png分佈式

轉化操做

  RDD的轉化操做會返回一個新的RDD。轉化操做是惰性求值的,只有行動操做用到轉化操做生成的RDD時,纔會真正進行轉化。
轉化算子.png
  spark使用lineage(血統)來記錄轉化操做生成的不一樣RDD之間的依賴關係。依賴分爲窄依賴(narrow dependencies)和寬依賴(wide dependencies)。ide

  • 窄依賴
    • 子RDD的每一個分區依賴於常數個父分區
    • 輸入輸出一對一,結果RDD的分區結構不變,主要是map、flatMap
    • 輸入輸出一對一,但結果RDD的分區結構發生變化,如union、coalesce
    • 從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample
  • 寬依賴函數

    • 子RDD的每一個分區依賴於全部父RDD分區
    • 對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey
    • 對兩個RDD基於key進行合併和重組,如join
      轉化算子依賴.pngoop

      行動操做

        行動操做則會向驅動器程序返回結果或把結果寫入外部系統,會觸發實際的計算。
      action算子.png性能

      緩存方式

        RDD經過persist方法或cache方法能夠將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
        cache最終也是調用了persist方法,默認的存儲級別是僅在內存存儲一份。
      緩存.jpg
        Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
      緩存方式.png
        緩存有可能丟失,RDD的緩存容錯機制保證即便緩存丟失也能保證計算正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。

      容錯機制

      • Lineage機制

        • RDD的Lineage記錄的是粗粒度的特定數據Transformation操做行爲。當RDD的部分分區數據丟失時,能夠經過Lineage來從新運算和恢復丟失的數據分區。這種粗顆粒的數據模型,限制了Spark的運用場合,因此Spark並不適用於全部高性能要求的場景,但同時相比細顆粒度的數據模型,也帶來了性能的提高。

        • Spark Lineage機制是經過RDD的依賴關係來執行的

          • 窄依賴能夠在某個計算節點上直接經過計算父RDD的某塊數據計算獲得子RDD對應的某塊數據。

          • 寬依賴則要等到父RDD全部數據都計算完成後,將父RDD的計算結果進行hash並傳到對應節點上以後才能計算子RDD。寬依賴要將祖先RDD中的全部數據塊所有從新計算,因此在長「血統」鏈特別是有寬依賴的時候,須要在適當的時機設置數據檢查點。
      • Checkpoint機制

        • 簡介

          • 當RDD的action算子觸發計算結束後會執行checkpoint;Task計算失敗的時候會從checkpoint讀取數據進行計算。
        • 實現方式(checkpoint有兩種實現方式,若是代碼中沒有設置checkpoint,則使用local的checkpoint模式,若是設置路徑,則使用reliable的checkpoint模式。)

          • LocalRDDCheckpointData:臨時存儲在本地executor的磁盤和內存上。該實現的特色是比較快,適合lineage信息須要常常被刪除的場景(如GraphX),可容忍executor掛掉。

          • ReliableRDDCheckpointData:存儲在外部可靠存儲(如hdfs),能夠達到容忍driver 掛掉狀況。雖然效率沒有存儲本地高,可是容錯級別最好。

忠於技術,熱愛分享。歡迎關注公衆號:java大數據編程,瞭解更多技術內容。

這裏寫圖片描述

相關文章
相關標籤/搜索