Spark 的核心概念 RDD

1.RDD 概述

1.1 什麼是 RDD ?

RDD(Resilient Distributed Dataset) 叫着 彈性分佈式數據集 ,是Spark 中最基本的抽象,它表明一個不可變、可分區、裏面元素能夠並行計算的集合。php

RDD 具備數據流模型特色:自動容錯、位置感知性調度和可伸縮。java

RDD 容許用戶在執行多個查詢時,顯示地將工做集緩存在內存中,後續的查詢可以重用工做集,這將會極大的提高查詢的效率。linux

咱們能夠認爲 RDD 就是一個代理,咱們操做這個代理就像操做本地集合同樣,不需去關心任務調度、容錯等問題。android

1.2 RDD 的屬性

在 RDD 源碼中這樣來描述 RDD算法

*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

複製代碼
  1. 一組分片(Partition),即數據集的基本組成單位。 對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD 的時候指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Cores 的數目;
  2. 對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD 的時候指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Cores 的數目;
  3. RDD 之間互相存在依賴關係。 RDD 的每次轉換都會生成一個新的 RDD ,因此 RDD 以前就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark 能夠經過這個依賴關係從新計算丟失部分的分區數據,而不是對 RDD 的全部分區進行從新計算。
  4. 一個Partitioner ,即 RDD 的分片函數 。當前Spark 中實現了兩種類型的分片函數,一個是基於哈希的 HashPartitioner ,另一個是基於範圍的 RangePartitioner。只有對於key-value的RDD ,纔會有 Partitioner,非 key-value 的RDD 的 Partitioner 的值是None。Partitioner 函數不但決定了RDD 自己的分片數量,也決定了 Parent RDD Shuffle 輸出時的分片數量。
  5. 一個列表,存儲存取每一個Partition 的優先位置(preferred location)。 對於一個HDFS 文件來講,這個列表保存的就是每一個 Partition 所在的塊位置。安裝「移動數據不如移動計算」的理念,Spark 在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。

2 建立 RDD

2.1 由一個存在的 Scala 集合進行建立

#經過並行化scala集合建立RDD,通常在測試的時候使用
scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
複製代碼

2.2 由外部的存儲系統的數據集建立,包括本地的文件系統,還有全部 Hadoop 支持的數據集,好比 HDFS、Cassandra、Hbase

var rdd1 = sc.textFile("/root/words.txt")
var rdd2 = sc.textFile("hdfs:192.168.80.131:9000/words.text")
複製代碼

2.3 調用一個已經存在了的RDD 的 Transformation,會生成一個新的 RDD。

3 RDD 的編程 API

3.1 Transformation

這種 RDD 中的全部轉換都是延遲加載的,也就是說,他們並不會直接就計算結果。相反的,他們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個返回結果的 Driver 的動做時,這些操做纔會真正的運行。這種設計會讓Spark 更加有效率的運行。apache

經常使用的 Transformation 操做:編程

轉換 含義
map(func) 返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成
filter(func) 返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成
flatMap(func) 相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)
mapPartitions(func) 相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 對源RDD和參數RDD求並集後返回一個新的RDD
intersection(otherDataset) 對源RDD和參數RDD求交集後返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分區聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對k/y的RDD進行操做
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey相似,可是更靈活
join(otherDataset, [numTasks]) 在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars]) 調用外部程序
coalesce(numPartitions) 從新分區 第一個參數是要分多少區,第二個參數是否shuffle 默認false ;少分區變多分區 true ; 多分區變少分區 false
repartition(numPartitions) 從新分區 必須shuffle 參數是要分多少區 少變多
repartitionAndSortWithinPartitions(partitioner) 從新分區+排序 比先分區再排序效率高 對K/V的RDD進行操做

3.2 Action

觸發代碼的運行操做,咱們一個Spark 應用,至少須要一個 Action 操做。windows

動做 含義
reduce(func) 經過func函數彙集RDD中的全部元素,這個功能必須是課交換且可並聯的
collect() 在驅動程序中,以數組的形式返回數據集的全部元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(相似於take(1))
take(n) 返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed]) 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。
saveAsObjectFile(path)
countByKey() 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。
foreach(func) 在數據集的每個元素上,運行函數func進行更新。
foreachPartition(func) 在每一個分區上,運行函數 func

3.3 Spark WordCount 代碼示例

執行流程圖: api

wc執行流程圖

pom.xml 依賴數組

<!-- 導入scala的依賴 -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.2.0</version>
</dependency>
<!-- 導入spark的依賴 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

<!-- 指定hadoop-client API的版本 -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>
複製代碼

scala 版本代碼實現:

package com.zhouq.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * scala 版本實現 wc
  *
  */
object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    //這行代碼是由於我在windows 上直接跑,須要去讀取 hadoop 上的文件,設置個人用戶名。若是是linux 環境能夠不設置。視狀況而定
    System.setProperty("HADOOP_USER_NAME", "root")
    //建立spark 配置,設置應用程序名字
//    val conf = new SparkConf().setAppName("scalaWordCount")
    val conf = new SparkConf().setAppName("scalaWordCount").setMaster("local[4]")

//    conf.set("spark.testing.memory","102457600")
    //建立spark 執行的入口
    val sc = new SparkContext(conf)

    //指定之後從哪裏讀取數據建立RDD (彈性分佈式數據集)
    //取到一行數據
    val lines: RDD[String] = sc.textFile(args(0))

    //切分壓平
    val words: RDD[String] = lines.flatMap(_.split(" "))

    //按單詞和一組合
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //按key 進行聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 排序, false 表示倒序
    val sorted = reduced.sortBy(_._2, false)

    //將結果保存到hdfs中
    sorted.saveAsTextFile(args(1))

    //釋放資源
    sc.stop()
  }
}
複製代碼

Java7 版本:

package com.zhouq.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;

/**
* Java 版WordCount
*/
public class JavaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        //建立SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //指定讀取數據的位置
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分壓平
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception{
                return Arrays.asList(line.split(" ")).iterator();
            }
        });

        //將單詞進行組合 (a,1),(b,1),(c,1),(a,1)
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String tp) throws Exception {
                return new Tuple2<>(tp, 1);
            }
        });

        //先交換再排序,由於 只有groupByKey 方法
        JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
//                return new Tuple2<>(tp._2, tp._1);
                return tp.swap();
            }
        });

        //排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);

        //再次交換順序
        JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        //輸出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
複製代碼

Java8 版本:

package com.zhouq.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;

/**
* Java Lambda 表達式版本的  WordCount
*/
public class JavaLambdaWordCount {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        //建立SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //指定讀取數據的位置
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分壓平
//        lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());

        //將單詞進行組合 (a,1),(b,1),(c,1),(a,1)
//        words.mapToPair(tp -> new Tuple2<>(tp,1));
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair((PairFunction<String, String, Integer>) tp -> new Tuple2<>(tp, 1));

        //先交換再排序,由於 只有groupByKey 方法
//        swaped.mapToPair(tp -> tp.swap());
        JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) tp -> {
//                return new Tuple2<>(tp._2, tp._1);
            return tp.swap();
        });

        //排序
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);

        //再次交換順序
//        sorted.mapToPair(tp -> tp.swap());
        JavaPairRDD<String, Integer> result = sorted.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) tp -> tp.swap());

        //輸出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
複製代碼

4 RDD 的依賴關係

RDD 和它依賴的 父 RDD(可能有多個) 的關係有兩種不一樣的類型,即 窄依賴(narrow dependency)和寬依賴(wide dependency)。

在這裏插入圖片描述

窄依賴:窄依賴指的是每個父 RDD 的 Partition 最多被子 RDD 的一個分區使用。能夠比喻爲獨生子女。 寬依賴:寬依賴是多個字 RDD 的Partition 會依賴同一個父 RDD 的 Partition

5 RDD 的持久化

5.1 RDD 的 cache(持久化)

Spark中最重要的功能之一是跨操做在內存中持久化(或緩存)數據集。當您持久保存RDD時,每一個節點都會存儲它在內存中計算的任何分區,並在該數據集(或從中派生的數據集)的其餘操做中重用它們。這使得將來的行動更快(一般超過10倍)。緩存是迭代算法和快速交互使用的關鍵工具。

您可使用persist()或cache()方法標記要保留的RDD 。第一次在動做中計算它,它將保留在節點的內存中。Spark的緩存是容錯的 - 若是丟失了RDD的任何分區,它將使用最初建立它的轉換自動從新計算。

5.2 何時咱們須要持久化?

  1. 要求的計算速度快
  2. 集羣的資源要足夠大
  3. 重要: cache 的數據會屢次觸發Action
  4. 建議先進行數據過濾,而後將縮小範圍後的數據再cache 到內存中.

5.3 如何使用

使用 rdd.persist()或者rdd.cache()

val lines: RDD[String] = sc.textFile("hdfs://xxx/user/accrss")
//使用cache 方法來緩存數據到內存
val cache = lines.cache()
//注意查看下面兩次count 的時間
cached.count
cached.count

複製代碼

5.4 數據緩存的存儲級別 StorageLevel

咱們在 StorageLevel.scala 源碼中能夠看到:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
複製代碼

解釋一下各個參數的意思:

第一個參數表示: 放到磁盤 第二個參數表示: 放到內存 第三個參數表示: 磁盤中的數據是否以Java 對象的方式保存,true 表示是, false表示以序列化的方式存放 第四個參數表示: 內存中的數據是否以Java 對象的方式保存,true 表示是, false表示以序列化的方式存放 第五個參數表示: 存放幾份數據(目的是爲了怕executor 出現故障致使分區數據丟失,當從新分配任務時,去另外的機器讀取備份數據進行從新計算)

OFF_HEAP : 堆外內存,以序列化的格式存儲RDD到Tachyon(一個分佈式內存存儲系統)中

5.5 如何選擇存儲級別

Spark的多個存儲級別意味着在內存利用率和cpu利用效率間的不一樣權衡。咱們推薦經過下面的過程選擇一個合適的存儲級別:

  1. 若是你的RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。由於這是cpu利用率最高的選項,會使RDD上的操做盡量的快。
  2. 若是不適合用默認的級別,選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提升對象的空間使用率,可是仍可以至關快的訪問。
  3. 除非函數計算RDD的花費較大或者它們須要過濾大量的數據,不要將RDD存儲到磁盤上,不然,重複計算一個分區就會和重磁盤上讀取數據同樣慢。
  4. 若是你但願更快的錯誤恢復,能夠利用重複(replicated)存儲級別。全部的存儲級別均可以經過重複計算丟失的數據來支持完整的容錯,可是重複的數據可以使你在RDD上繼續運行任務,而不須要重複計算丟失的數據。
  5. 在擁有大量內存的環境中或者多應用程序的環境中,OFF_HEAP具備以下優點:
    1. 它運行多個執行者共享Tachyon中相同的內存池
    2. 它顯著地減小垃圾回收的花費
    3. 若是單個的執行者崩潰,緩存的數據不會丟失

5.6 刪除 cache

Spark自動的監控每一個節點緩存的使用狀況,利用最近最少使用原則刪除老舊的數據。若是你想手動的刪除RDD,可使用 RDD.unpersist()方法

5.7 RDD 的 checkpoint機制

咱們除了把數據緩存到內存中,還能夠把數據緩存到HDFS 中,保證中間數據不丟失.

何時咱們須要作chechpoint?

  1. 作複雜的迭代計算,要求保證數據安全,不丟失
  2. 對速度要求不高(跟 cache 到內存進行對比)
  3. 將中間結果保存到 hdfs 中

怎麼作 checkpoint ?

首先設置 checkpoint 目錄,而後再執行計算邏輯,再執行 checkpoint() 操做。

下面代碼使用cache 和 checkpoint 兩種方式實現計算每門課最受歡迎老師的 topN

package com.zhouq.spark

import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 求每門課程最受歡迎老師TopN  --2
  *   -- 使用cache
  *   -- 使用checkpoint 通常設置hdfs 目錄
  */
object GroupFavTeacher2_cache_checkpoint {
  def main(args: Array[String]): Unit = {
    //前 N
    val topN = args(1).toInt
    //學科集合
    val subjects = Array("bigdata", "javaee", "php")
    val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]")
    //建立spark 執行入口
    val sc = new SparkContext(conf)
    //checkpoint 得先設置 sc 的checkpoint 的dir
//    sc.setCheckpointDir("hdfs://hdfs://hadoop1:8020/user/root/ck20190215")

    //指定讀取數據
    val lines: RDD[String] = sc.textFile(args(0))
    val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
      val index = line.lastIndexOf("/")
      var teacher = line.substring(index + 1)
      var httpHost = line.substring(0, index)
      var subject = new URL(httpHost).getHost.split("[.]")(0)
      ((subject, teacher), 1)
    })
    //將學科,老師聯合當作key
    val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_ + _)

    //第一種使用cache RDD 把數據緩存在內存中.標記爲cache 的RDD 之後被反覆使用,才使用cache
    val cached: RDD[((String, String), Int)] = reduced.cache()

    //第二種 使用checkpoint,得先設置 sc 的 checkpointDir
//   val cached: RDD[((String, String), Int)] = reduced.checkpoint()

    /**
      * 先對學科進行過濾,而後再進行排序,調用RDD 的sortBy進行排序,避免scala 的排序當數據量大時,內存不足的狀況.
      * take 是Action 操做,每次take 都會進行一次任務提交,具體查看日誌打印狀況
      */
    for (sub <- subjects) {
      //過濾出當前的學科
      val filtered: RDD[((String, String), Int)] = cached.filter(_._1._1 == sub)
      //使用RDD 的 sortBy ,內存+磁盤排序,避免scala 中的排序因內存不足致使異常狀況.
      //take 是Action 的,因此每次循環都會觸發一次提交任務,祥見日誌打印狀況
      val favTeacher: Array[((String, String), Int)] = filtered.sortBy(_._2, false).take(topN)
      println(favTeacher.toBuffer)
    }

    /**
      * 前面cache的數據已經計算完了,後面還有不少其餘的指標要計算
      * 後面計算的指標也要觸發不少次Action,最好將數據緩存到內存
      * 原來的數據佔用着內存,把原來的數據釋放掉,才能緩存新的數據
      */

    //把原來緩存的數據釋放掉
    cached.unpersist(true)

    sc.stop()
  }
}
複製代碼

6 DAG 的生成

DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。

在這裏插入圖片描述

微信公衆號文章連接:Spark RDD

有興趣歡迎關注,你們一塊兒交流學習。

在這裏插入圖片描述
相關文章
相關標籤/搜索