大數據篇:Spark

大數據篇:Spark

  • Spark是什麼

Spark是一個快速(基於內存),通用,可擴展的計算引擎,採用Scala語言編寫。2009年誕生於UC Berkeley(加州大學伯克利分校,CAL的AMP實驗室),2010年開源,2013年6月進入Apach孵化器,2014年成爲Apach頂級項目,目前有1000+個活躍者。就是說用Spark就對了。html

Spark支持Scala,Java,R,Python語言,並提供了幾十種(目前80+種)高性能的算法,這些若是讓咱們本身來作,幾乎不可能。java

Spark獲得衆多公司支持,如:阿里、騰訊、京東、攜程、百度、優酷、土豆、IBM、Cloudera、Hortonworks等。mysql

  • 若是沒有Spark

解決MapReduce慢的問題而誕生,官網解釋比一樣的MapReduce任務快100倍!linux

spark.apache.org算法

1 內置模塊

機器學習(MLlib),圖計算(GraphicX),實時處理(SparkStreaming),SQL解析(SparkSql)sql

1.1 集羣資源管理

Spark設計爲能夠高效的在一個計算節點到數千個計算節點之間伸縮計算,爲了實現這樣的要求,同時得到最大靈活性,Spark支持在各類集羣資源管理器上運行,目前支持的3種以下:(上圖中下三個)shell

  1. Hadoop YARN(國內幾乎都用)
  2. Apach Mesos(國外使用較多)
  3. Standalone(Spark自帶的資源調度器,須要在集羣中的每臺節點上配置Spark)

1.2 Spark Core

實現了Spark的基本功能,包含任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。其中還包含了對彈性分佈式數據集(RDD:Resilient Distributed DataSet)的API定義數據庫

1.3 Spark SQL

是Spark用來操做結構化數據的程序包,經過Spark SQL 咱們能夠使用SQL或者HQL來查詢數據。且支持多種數據源:Hive、Parquet、Json等apache

1.4 Spark Streaming

是Spark提供的對實時數據進行流式計算的組件編程

1.5 Spark MLlib

提供常見的機器學習功能和程序庫,包括分類、迴歸、聚類、協同過濾等。還提供了模型評估、數據導入等額外的支持功能。

2 運行模式

2.1 核心概念介紹

  • Master

    • Spark特有的資源調度系統Leader,掌控整個集羣資源信息,相似於Yarn框架中的ResourceManager
    • 監聽Worker,看Worker是否正常工做
    • Master對Worker、Application等的管理(接收Worker的註冊並管理全部的Worker,接收Client提交的Application,調度等待Application並向Worker提交)
  • Worker

    • Spark特有的資源調度Slave,有多個,每一個Slave掌管着全部節點的資源信息,相似Yarn框架中的NodeManager
    • 經過RegisterWorker註冊到Master
    • 定時發送心跳給Master
    • 根據Master發送的Application配置進程環境,並啓動ExecutorBackend(執行Task所需的進程)
  • Driver

    • Spark的驅動器,是執行開發程序中的main方法的線程
    • 負責開發人員編寫SparkContext、RDD,以及進行RDD操做的代碼執行,若是使用Spark Shell,那麼啓動時後臺自啓動了一個Spark驅動器,預加載一個叫作sc的SparkContext對象,若是驅動器終止,那麼Spark應用也就結束了。
    • 4大主要職責:
      • 將用戶程序轉化爲做業(Job)
      • 在Executor之間調度任務(Task)
      • 跟蹤Executor的執行狀況
      • 經過UI展現查詢運行狀況
  • Excutor

    • Spark Executor是一個工做節點,負責在Spark做業中運行任務,任務間相互獨立。Spark應用啓動時,Executor節點被同時啓動,而且始終伴隨着整個Spark應用的生命週期而存在,若是有Executor節點發生了故障或崩潰,Spark應用也能夠繼續執行,會將出錯節點上的任務調度到其餘Executor節點上繼續運行
    • 兩個核心功能:
      • 負責運行組成Spark應用的任務,並將結果返回給驅動器(Driver)
      • 它經過自身塊管理器(BlockManager)爲用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接存在Executor進程內的,所以任務能夠在運行時充分利用緩存數據加速運算。
  • RDDs

    • Resilient Distributed DataSet:彈性分佈式數據集
    • 一旦擁有SparkContext對象,就能夠用它來建立RDD
  • 通用流程圖

2.2 WordCount案例

  • Spark Shell方式
#建立word.txt文件
vim word.txt
#--->
hadoop hello spark
spark word
hello hadoop spark
#---<
#上傳HDFS集羣
hadoop dfs -put word.txt /
#連接客戶端
spark-shell

sc.textFile("/word.txt").flatMap(line => line.split(' ')).map((_,1)).reduceByKey(_ + _).collect

每一個Spark應用程序都包含一個驅動程序,驅動程序負責把並行操做發佈到集羣上,驅動程序包含Spark應用中的主函數,定義了分佈式數據集以應用在集羣中,在前面的wordcount案例中,spark-shell就是咱們的驅動程序,因此咱們鍵入咱們任何想要的操做,而後由它負責發佈,驅動程序經過SparkContext對象來訪問Spark,SparkContext對象至關於一個到Spark集羣的連接

2.3 Job劃分和調度

  • Application應用
    • 一個SparkContext就是一個Application
  • Job做業:
    • 一個行動算子(Action)就是一個Job
  • Stage階段:
    • 一次寬依賴(一次shuffle)就是一個Stage,劃分是從後往前劃分
  • Task任務:
    • 一個核心就是一個Task,體現任務的並行度,經常根據核心數的1.5倍進行設置

  • 使用WordCount案例分析

一個行動算子collect(),一個job

一次寬依賴shuffle算子reduceByKey(),切分紅2個Stage階段

Stage階段,默認文件被切分紅2份,因此有2個task

Stage階段0

Stage階段1

2.4 Shuffle洗牌

2.4.1 ShuffleMapStage And ResultStage

  • 在劃分stage時,最後一個stage稱爲FinalStage,本質上是一個ResultStage對象,前面全部的stage被稱爲ShuffleMapStage

  • ShuffleMapStage 的結束伴隨着shuffle文件寫磁盤

  • ResultStage對應代碼中的action算子,即將一個函數應用在RDD的各個Partition(分區)的數據集上,意味着一個Job運行結束

2.4.2 HashShuffle

  • 未優化HashShuffle流程圖:目前已經沒有了

如上圖,最終結果會有12個小文件

  • 優化後HashShuffle流程圖

如上圖,最終結果會有6個小文件,比未優化前少了一半

2.4.3 SortShuffle

該模式下,數據會先寫入一個數據結果,reduceByKey寫入Map,一邊經過Map局部聚合,一邊寫入內存,

Join算子寫入ArrayList直接寫入內存中,而後須要判斷是否達到閥值,若是達到就會將內存數據寫入磁盤,釋放內存資源

2.4.4 Bypass SortShuffle

  • Bypass SortShuffle運行機制觸發條件
    • shuffle map task 數量小於 spark.shuffle.sort.bypassMargeThreshold參數的值,默認爲200
    • 不是聚合類的shuffle算子

2.5 Submit語法

spark-submit \
--class <main-calss> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
...  #其餘 options
<application-jar> \
[application-arguments]
  • --class:應用啓動類全類名(如:org.apache.spark.examples.SparkPi)
  • --master:指定master地址,默認本機Local(本地通常使用Local[*],集羣通常使用yarn)
  • --deploy-mode:是否發佈到驅動worker節點(參數:cluster),或者做爲一個本地客戶端(參數:client),默認本地client
  • --conf:任意Spark配置屬性,格式key=value,如包含空格,能夠加引號"key=value"
  • application-jar:打包好的應用程序jar,包含依賴,這個URL在集羣中全局課件,如HDFS上的jar->hdfs://path;如linux上的jar->file://path 且全部節點路徑都須要包含這個jar
  • application-arguments:給main()方法傳參數
  • --executor-memory 1G:指定每一個executor可用內存爲1G
  • --total-executor-cores 6:指定全部executor使用的cpu核數爲6個
  • --executor-cores 2:表示每一個executor使用的cpu的核數2個

2.6 Local模式

Local模式就是在一臺計算機上運行Spark,一般用於開發中。(單機)

  • Submit提交方式
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[*] \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100

2.7 Standalone模式

構建一個由 Master + Slave 構成的Spark集羣,Spark運行在集羣中,只依賴Spark,不依賴別的組件(如:Yarn)。(獨立的Spark集羣)

#連接客戶端
spark-shell --master spark://cdh01.cm:7337

參考wordCount案例

  • Standalone-Client流程圖

  • Standalone-Cluster流程圖

2.8 Yarn模式

Spark客戶端能夠直接鏈接Yarn,不須要構建Spark集羣。

有yarn-client和yarn-cluster兩種模式,主要區別在:Driver程序的運行節點不一樣。

yarn-client:Driver程序運行在客戶端,適用於交互、調試,但願當即看見APP輸出

yarn-cluster:Driver程序運行在由ResourceManager啓動的ApplicationMaster上,適用於生產環境

  • Yarn-Client流程圖

  • Yarn-Cluster流程圖

  • 客戶端模式:Driver是在Client端,日誌結果能夠直接在後臺看見
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100
  • 集羣模式:Driver是在NodeManager端,日誌結果須要經過監控日誌查看
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.2.0.jar 100

3 使用IDEA開發Spark

  • pom.xml
<dependencies>
        <!-- scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>
        <!-- Spark Core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark SQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark On Hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Hbase On Spark-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>2.1.0-cdh6.2.0</version>
        </dependency>
        <!-- Spark Streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Spark Streaming Kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- Kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-tools</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-examples</artifactId>
            <version>2.1.0</version>
        </dependency>

        <!--mysql依賴的jar包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

    </dependencies>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <build>
        <plugins>
            <!-- 在maven項目中既有java又有scala代碼時配置 maven-scala-plugin 插件打包時能夠將兩類代碼一塊兒打包 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <!-- MAVEN 編譯使用的JDK版本 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase><!--綁定到package生命週期階段-->
                        <goals>
                            <goal>single</goal><!--只運行一次-->
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <!--<finalName></finalName>&lt;!&ndash;主類入口&ndash;&gt;-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
  • WorkCount案例

    1. 在resources文件夾下,新建word.csv文件
    hello,spark
    hello,scala,hadoop
    hello,hdfs
    hello,spark,hadoop
    hello
    1. WorkCount.scala
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WorkCount {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("WorkCount").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val tuples: Array[(String, Int)] = sc.textFile(ClassLoader.getSystemResource("word.csv").getPath)
          .flatMap(_.split(","))
          .map((_, 1))
          .reduceByKey(_ + _)
          .collect()
        tuples.foreach(println)
      }
    }

    結果:

    (scala,1)
    (hello,5)
    (spark,2)
    (hadoop,2)
    (hdfs,1)

4 Spark Core

4.1 什麼是RDD

Resilient Distributed DataSet:彈性分佈式數據集,是Spark中最基本數據抽象,能夠理解爲數據集合。

在代碼中是一個抽象類,它表明一個彈性的、不可變的、可分區,裏面的元素可並行計算的集合。

4.2 RDD的五個主要特性

  1. 分區性
    • 多個分區,分區能夠當作是數據集的基本組成單位
    • 對於RDD來講,每一個分區都會被一個計算任務處理,並決定了並行計算的粒度。
    • 用戶能夠在建立RDD時,指定RDD的分區數,若是沒有指定,那麼採用默認值(程序所分配到的CPU Coure的數目)
    • 每一個分配的儲存是由BlockManager實現的,每一個分區都會被邏輯映射成BlockManager的一個Block,而這個Block會被一個Task負責計算。
  2. 計算每一個分區的函數
    • Spark中RDD的計算是以分區爲單位的,每一個RDD都會實現compute函數以達到這個目的
  3. 依賴性
    • RDD的每次轉換都會生成一個新的RDD,因此RDD之間會造成相似於流水線同樣的先後依賴關係,在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
  4. 對儲存鍵值對的RDD,還有一個可選的分區器
    • 只有對key-value的RDD,纔會有Partitioner,非key-value的RDD的Rartitioner的值是None
    • Partitioner不但決定了RDD的分區數量,也決定了parent RDD Shuffle輸出時的分區數量
    • 默認是HashPartitioner,還有RangePartition,自定義分區
  5. 儲存每一個分區優先位置的列表(本地計算性)
    • 好比對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在文件快的位置,按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的儲存位置。

4.3 Transformation和Action算子

在Spark中,Transformation算子(也稱轉換算子),在沒有Action算子(也稱行動算子)去觸發的時候,是不會執行的,能夠理解爲懶算子,而Action算子能夠理解爲觸發算子,經常使用Action算子以下:

  • redece:經過函數彙集RDD的全部元素,先聚合分區內的數據,在聚合分區間的數據(預聚合)
  • collect:以數組的形式返回RDD中的全部元素,全部數據都會被拉到Driver端,內存開銷很大,因此慎用
  • count:返回RDD中元素個數
  • take:返回RDD中前N個元素組成的數組
  • first:返回RDD中的第一個元素,相似於tack(1)
  • takeOrdered:返回排序後的前N個元素,默認升序,數據也會拉到Driver端
  • aggregate:分區內聚合後,在分區間聚合
  • fold:aggregate簡化操做,若是分區內和分區間算法同樣,則能夠使用
  • saveAsTextFile:將數據集的元素以textFile的形式保存到HDFS文件系統或者其餘文件系統,對每一個元素,Spark都會調用toString方法轉換爲文本
  • saveAsSequenceFile:將數據集的元素以Hadoop SquenceFile的形式保存到指定目錄下,能夠是HDFS或者其餘文件系統
  • saveAsObjectFile:將RDD中的元素序列化成對象,儲存到文件中
  • countByKey:針對k-v類型RDD,返回一個Map(Key,count),能夠用來查看數據是否傾斜
  • foreach:針對RDD中的每個元素都執行一次函數,每一個函數實在Executor上執行的

經常使用Transformation算子以下:

  • map:輸入變換函數應用於RDD中全部元素,轉換其類型
  • mapPartitions:輸入變換函數應用於每一個分區中全部元素
  • mapPartitionsWithIndex:輸入變換函數應用於每一個分區中全部元素,帶有分區號
  • filter:過濾算子
  • flatMap:扁平化算子
  • sample:抽樣算子
  • union:並集算子
  • intersection:交集算子
  • distinct:去重算子
  • groupByKey:根據Key分組算子
  • reduceByKey:根據Key聚合算子
  • aggregateByKey:根據Key聚合算子
  • sortByKey:根據Key排序算子
  • join:連接算子
  • coalesce:壓縮分區算子
  • repartition:重分區算子

4.4 RDD的建立

4.4.1 從集合中建立

object Demo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WorkCount").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * 經過parallelize方法傳入序列獲得RDD
      * 傳入分區數爲1,結果爲1	2	3	4	5	6	7	8	9	10
      * 傳入分區數大於1,結果順序不定,由於數據被打散在2個分區裏
      * */
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 1)
    rdd.foreach(x => print(x + "\t"))
  }
}

4.4.2 從外部儲存建立RDD

  • 讀取textFile

WordCount案例介紹了此種用法

  • 讀取Json文件

在idea中,resources目錄下建立word.json文件

{"name": "zhangsa"}
{"name": "lisi", "age": 30}
{"name": "wangwu"}
["aa","bb"]
object Demo0 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("json").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[String] = sc.textFile(this.getClass().getClassLoader.getResource("word.json").getPath)
    val rdd2: RDD[Option[Any]] = rdd1.map(JSON.parseFull(_))
    rdd2.foreach(println)
    /**
      * Some(Map(name -> zhangsa))
      * Some(Map(name -> wangwu))
      * Some(List(aa, bb))
      * Some(Map(name -> lisi, age -> 30.0))
      * */
  }
}
  • 讀取Object對象文件
object Demo1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("object").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
//    rdd1.saveAsObjectFile("hdfs://cdh01.cm/test")

    val rdd2: RDD[Nothing] = sc.objectFile("hdfs://cdh01.cm/test")
    rdd2.foreach(println)

    /**
      * 2
      * 5
      * 1
      * 4
      * 3
      * */
  }
}

4.4.3 從其餘RDD轉換獲得新的RDD

  • 根據RDD的數據類型的不一樣,總體分爲2種RDD:Value類型,Key-Value類型(二維元組)

map()返回一個新的RDD,該RDD是由原RDD的每一個元素通過函數轉換後的值組成,主要做用就是轉換結構。(不存在shuffle)

  • 案例一:
object Demo2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("map").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * map算子,一共有多少元素就會執行多少次,和分區數無關
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 1)
    val mapRdd: RDD[Int] = rdd.map(x => {
      println("執行") //一共被執行10次
      x * 2
    })
    val result: Array[Int] = mapRdd.collect()
    result.foreach(x => print(x + "\t")) //2	4	6	8	10	12	14	16	18	20
  }
}
  • 案例二:
object demo3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapPartitions").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * mapPartitions算子,一個分區內處理,幾個分區就執行幾回,優於map函數
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
    val mapRdd: RDD[Int] = rdd.mapPartitions(it => {
      println("執行") //分區2次,共打印2次
      it.map(x => x * 2)
    })
    val result: Array[Int] = mapRdd.collect()
    result.foreach(x => print(x + "\t")) //2	4	6	8	10	12	14	16	18	20
  }
}
  • 案例三:
object Demo4 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local[*]")
    val sc = new SparkContext(conf)

    /**
      * mapPartitionsWithIndex算子,一個分區內處理,幾個分區就執行幾回,返回帶有分區號的結果集
      **/
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
    val value: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, it) => it.map((index, _)))
    val result: Array[(Int, Int)] = value.collect()
    result.foreach(x => print(x + "\t")) //(0,1)	(0,2)	(0,3)	(0,4)	(0,5)	(1,6)	(1,7)	(1,8)	(1,9)	(1,10)
  }
}

4.5 flatMap

扁平化(不存在shuffle)

object Demo5 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("flatMap").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
    val map_result: RDD[String] = rdd.map(ele => ele._1 + ele._2)
    val flatMap_result: RDD[Char] = rdd.flatMap(ele => ele._1 + ele._2)
    
    /**
      * C3
      * A1
      * B2
      **/
    map_result.foreach(println)

    /**
      * B
      * A
      * C
      * 1
      * 2
      * 3
      **/
    flatMap_result.foreach(println)
  }
}

4.6 glom

將每個分區的元素合併成一個數組,造成新的RDD類型:RDD[Array[T]] (不存在shuffle)

object Demo6 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("glom").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10), 3)
    val result: RDD[Array[Int]] = rdd.glom()

    /**
      * 1,2,3
      * 7,8,9,10
      * 4,5,6
      * */
    result.foreach(x=>{
      println(x.toList.mkString(","))
    })

  }
}

4.7 groupBy

根據條件函數分組(存在shuffle)

object Demo7 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("groupBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))
    val result1: RDD[(Int, Iterable[Int])] = rdd.groupBy(x => x % 2)
    val result2: RDD[(Boolean, Iterable[Int])] = rdd.groupBy(x => x % 2 == 0)

    /**
      * (0,CompactBuffer(2, 4, 6, 8, 10))
      * (1,CompactBuffer(1, 3, 5, 7, 9))
      **/
    result1.foreach(println)

    /**
      * (true,CompactBuffer(2, 4, 6, 8, 10))
      * (false,CompactBuffer(1, 3, 5, 7, 9))
      **/
    result2.foreach(println)
  }
}

4.8 filter

過濾(不存在shuffle)

object Demo8 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("filter").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))
    val result: RDD[Int] = rdd.filter(x => x % 2 == 0)
    result.foreach(x => print(x + "\t"))  //6	10	8	4	2	
  }
}

4.9 sample

sample(withReplacement,fraction,seed)抽樣,經常使用在解決定位大key問題

  • 以指定的隨機種子隨機抽樣出比例爲fraction的數據(抽取到的數量是size*fraction),注意:獲得的結果並不能保證準確的比例,也就是說fraction只決定了這個數被選中的比率,並非從數據中抽出多少百分比的數據,決定的不是個數,而是比率。
  • withReplacement表示抽出的數據是否放回,true爲有放回抽樣,flase爲無放回抽樣,放回表示數據有可能會被重複抽取到,false則不可能重複抽取到,若是爲false則fraction必須在[0,1]內,是true則大於0便可。
  • seed用於指定隨機數生成器種子,通常默認的,或者傳入當前的時間戳,(若是傳入定值,每次取出結果同樣)
object Demo9 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sample").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10))

    /**
      * 不放回抽樣
      * 從結果中能夠看出,抽出結果沒有重複
      * */
    val result1: RDD[Int] = rdd.sample(false,0.5)
    result1.foreach(println)
    /**
      * 放回抽樣
      * 從結果中能夠看出,抽出結果有重複
      * */
    val result2: RDD[Int] = rdd.sample(true,2)
    result2.foreach(println)
  }
}

4.10 distinct

distinct([numTasks])去重,參數表示任務數量,默認值和分區數保持一致(不存在shuffle)

object Demo10 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("distinct").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(Array(1,2,3,4,2,3,4,3,4,5))
    val result: RDD[Int] = rdd.distinct(2)
    result.foreach(println)
  }
}

4.11 coalesce

coalesce(numPatitions)縮減,縮減分區到指定數量,用於大數據集過濾後,提升小數據集的執行效率,只能減不能加。(不存在shuffle)

object Demo11 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("coalesce").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10),5)
    println(rdd.partitions.length)  //5
    val result: RDD[Int] = rdd.coalesce(2)
    println(result.partitions.length)  //2
  }
}

4.12 repartition

repartition(numPatitions)更改分區,更改分區到指定數量,可加可減,可是減小仍是使用coalesce,將這個理解爲增長。(存在shuffle)

object Demo12 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("repartition").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1.to(10),2)
    println(rdd.partitions.length)  //2
    val result: RDD[Int] = rdd.repartition(5)
    println(result.partitions.length)  //5
  }
}

4.13 sortBy

排序(存在shuffle)

object Demo13 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sortBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(Array(4, 2, 3, 1, 5), 1)
    val result1: RDD[Int] = rdd.sortBy(x => x, false)
    result1.foreach(x => print(x + "\t"))  //5	4	3	2	1
    val result2: RDD[Int] = rdd.sortBy(x => x, true)
    result2.foreach(x => print(x + "\t"))  //1	2	3	4	5
  }
}

4.14 RDD與RDD互交

  • 並集:union
  • 差集:subtract
  • 交集:intersection
  • 笛卡爾積:cartesian
  • 拉鍊:zip
object Demo14 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD AND RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[Int] = sc.parallelize(1.to(5))
    val rdd2: RDD[Int] = sc.parallelize(3.to(8))

    //並集
    rdd1.union(rdd2).collect().foreach(x => print(x + "\t"))  //1	2	3	4	5	3	4	5	6	7	8
    //差集
    rdd1.subtract(rdd2).collect().foreach(x => print(x + "\t")) //1	2
    //交集
    rdd1.intersection(rdd2).collect().foreach(x => print(x + "\t")) //3	4	5\
    //笛卡爾積
    /*(1,3)	(1,4)	(1,5)	(1,6)	(1,7)	(1,8)
      (2,3)	(2,4)	(2,5)	(2,6)	(2,7)	(2,8)
      (3,3)	(3,4)	(3,5) (3,6) (3,7)	(3,8)
      (4,3)	(4,4)	(4,5)	(4,6)	(4,7)	(4,8)
      (5,3)	(5,4)	(5,5)	(5,6)	(5,7)	(5,8)*/
    rdd1.cartesian(rdd2).collect().foreach(x => print(x + "\t"))
    //拉鍊:必須保證RDD分區元素數量相同
    val rdd3: RDD[Int] = sc.parallelize(1.to(5))
    val rdd4: RDD[Int] = sc.parallelize(2.to(6))
    rdd3.zip(rdd4).collect().foreach(x => print(x + "\t"))  //(1,2)	(2,3)	(3,4)	(4,5)	(5,6)
  }
}

4.15 k-v類型 partitionBy

大多數Spark算子均可以用在任意類型的RDD上,可是有一些比較特殊的操做只能用在key-value類型的RDD上

使用HashPartitioner分區器

object Demo15 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    println(rdd2.partitions.length) //1
    println(rdd2.partitioner) //None
    val rdd3: RDD[(String, Int)] = rdd2.partitionBy(new HashPartitioner(2))
    println(rdd3.partitions.length) //2
    println(rdd3.partitioner) //Some(org.apache.spark.HashPartitioner@2)
    val result: RDD[(Int, (String, Int))] = rdd3.mapPartitionsWithIndex((index, it) => {
      it.map(x => (index, (x._1, x._2)))
    })
    result.foreach(println)

    /**
      * (1,(spark,1))
      * (0,(hello,1))
      * (0,(hadooop,1))
      * (0,(hello,1))
      **/
  }
}

自定義分區器

object Demo16 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    println(rdd2.partitions.length) //1
    println(rdd2.partitioner) //None

    val rdd3: RDD[(String, Int)] = rdd2.partitionBy(new MyPatitioner(2))
    println(rdd3.partitions.length) //2
    println(rdd3.partitioner) //Some(com.test.sparkcore.MyPatitioner@769a58e5)
    val result: RDD[(Int, (String, Int))] = rdd3.mapPartitionsWithIndex((index, it) => {
      it.map(x => (index, (x._1, x._2)))
    })
    result.foreach(println)

    /**
      * (0,(hadooop,1))
      * (1,(hello,1))
      * (0,(spark,1))
      * (1,(hello,1))
      **/
  }
}

class MyPatitioner(num: Int) extends Partitioner {
  override def numPartitions: Int = num

  override def getPartition(key: Any): Int = {
    System.identityHashCode(key) % num.abs
  }
}

4.16 k-v類型 reduceByKey

reduceByKey(V , V)=>V 根據key進行聚合,在shuffle以前會有combine(預聚合)操做

object Demo17 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("reduceByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.reduceByKey(_ + _)
    result.foreach(x => print(x + "\t"))  //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.17 k-v類型 groupByKey

根據key進行分組,直接shuffle

object Demo18 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("groupByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Iterable[Int])] = rdd2.groupByKey()
    result.foreach(x => print(x + "\t"))  //(spark,CompactBuffer(1))	(hadooop,CompactBuffer(1))	(hello,CompactBuffer(1, 1))
    result.map(x=>(x._1,x._2.size)).foreach(x => print(x + "\t")) 	//(spark,1)	(hadooop,1)	(hello,2)      
  }
}

4.18 k-v類型 aggrateByKey

aggrateByKey(zero : U)(( U , V )=>U , (U , U)=>U)

基於Key分組而後去聚合的操做,耗費資源太多,這時能夠使用reduceByKey或aggrateByKey算子去提升性能

aggrateByKey分區內聚合,後在進行shuffle聚合。

object Demo19 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("aggregateByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.aggregateByKey(0)(_ + _, _ + _)
    result.foreach(x => print(x + "\t")) //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.19 k-v類型 foldByKey

foldByKey(zero : V)((V , V)=>V) 摺疊計算,沒有aggrateByKey靈活,若是分區內和分區外聚合計算不同,則不行

object Demo20 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("foldByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.foldByKey(0)(_+_)
    result.foreach(x => print(x + "\t")) //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.20 k-v類型 combineByKey

combineByKey(V=>U,(U , V)=>U , (U , U)=>U) 根據Key組合計算

object Demo21 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("combineByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    val result: RDD[(String, Int)] = rdd2.combineByKey(v => v, (c: Int, v: Int) => c + v, (c1: Int, c2: Int) => c1 + c2)
    result.foreach(x => print(x + "\t"))  //(spark,1)	(hadooop,1)	(hello,2)
  }
}

4.21 k-v類型 sortByKey

根據Key排序

object Demo22 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sortByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("ahello", "bhadooop", "chello", "dspark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    rdd2.sortByKey(false).foreach(x => print(x + "\t")) //(dspark,1)	(chello,1)	(bhadooop,1)	(ahello,1)
  }
}

4.22 k-v類型 mapValues

只對value操做的map轉換操做

object Demo23 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapValues").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hello", "hadooop", "hello", "spark"), 1)
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
    rdd2.mapValues(x => x + 1).foreach(x => print(x + "\t")) //(hello,2)	(hadooop,2)	(hello,2)	(spark,2)
  }
}

4.23 k-v類型 join

object Demo24 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("join").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[(String, Int)] = sc.parallelize(Array(("a",10),("b",10),("a",20),("d",10)))
    val rdd2: RDD[(String, Int)] = sc.parallelize(Array(("a",30),("b",20),("c",10)))
    //內鏈接 (a,(10,30))	(b,(10,20))	(a,(20,30))
    rdd1.join(rdd2).foreach(x => print(x + "\t"))

    //左連接(b,(10,Some(20)))	(d,(10,None))	(a,(10,Some(30)))	(a,(20,Some(30)))
    rdd1.leftOuterJoin(rdd2).foreach(x => print(x + "\t"))

    //右連接(c,(None,10))	(a,(Some(10),30))	(b,(Some(10),20))	(a,(Some(20),30))
    rdd1.rightOuterJoin(rdd2).foreach(x => print(x + "\t"))
    
    //全連接(b,(Some(10),Some(20)))	(c,(None,Some(10)))	(d,(Some(10),None))	(a,(Some(10),Some(30)))	(a,(Some(20),Some(30)))
    rdd1.fullOuterJoin(rdd2).foreach(x => print(x + "\t"))
  }
}

4.24 k-v類型 cogroup

根據Key聚合RDD

object Demo25 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("cogroup").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[(String, Int)] = sc.parallelize(Array(("a",10),("b",10),("a",20),("d",10)))
    val rdd2: RDD[(String, Int)] = sc.parallelize(Array(("a",30),("b",20),("c",10)))

    /**
      * (c,(CompactBuffer(),CompactBuffer(10)))
      * (b,(CompactBuffer(10),CompactBuffer(20)))
      * (a,(CompactBuffer(10, 20),CompactBuffer(30)))
      * (d,(CompactBuffer(10),CompactBuffer()))
      */
    rdd1.cogroup(rdd2).foreach(println)
  }
}

4.25 keyo序列化

在分佈式應用中,常常會進行IO操做,傳遞對象,而網絡傳輸過程當中就必需要序列化。

Java序列化能夠序列化任何類,比較靈活,可是至關慢,而且序列化後對象的提交也比較大。

Spark出於性能考慮,在2.0之後,開始支持kryo序列化機制,速度是Serializable的10倍以上,當RDD在Shuffle數據的時候,簡單數據類型,簡單數據類型數組,字符串類型已經使用kryo來序列化。

object Demo26 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("keyo")
      .setMaster("local[*]")
      //替換默認序列化機制
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //註冊須要使用的kryo序列化自定義類
      .registerKryoClasses(Array(classOf[MySearcher]))

    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hadoop yarn", "hadoop hdfs", "c"))
    val rdd2: RDD[String] = MySearcher("hadoop").getMathcRddByQuery(rdd1)
    rdd2.foreach(println)
  }
}

case class MySearcher(val query: String) {
  def getMathcRddByQuery(rdd: RDD[String]): RDD[String] = {
    rdd.filter(x => x.contains(query))
  }
}

4.26 依賴

  • 窄依賴:(不會shuffle)

    • 若是RDD2由RDD1計算獲得,則RDD2就是子RDD,RDD1就是父RDD
    • 若是依賴關係在設計的時候就能夠肯定,而不須要考慮父RDD分區中的記錄,而且父RDD中的每一個分區最多隻有一個子分區,這就叫窄依賴
    • 父RDD的每一個分區中的數據最多被一個子RDD的分區使用
  • 寬依賴:(會shuffle)

    • 寬依賴每每對應着shuffle操做,須要在運行過程當中將同一個父RDD的分區傳入到不一樣的子RDD分區中。
    • 對於寬依賴,重算的父RDD分區對應多個子RDD分區,這樣實際上父RDD 中只有一部分的數據是被用於恢復這個丟失的子RDD分區的,另外一部分對應子RDD的其它未丟失分區,這就形成了多餘的計算;
    • 寬依賴中子RDD分區一般來自多個父RDD分區,極端狀況下,全部的父RDD分區都要進行從新計算。

4.27 持久化

object Demo27 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("cache").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("a", "b", "c"))
    val rdd2: RDD[String] = rdd1.flatMap(x => {
      println("執行flatMap操做")
      x.split("")
    })
    val rdd3: RDD[(String, Int)] = rdd2.map((_, 1))

    /** 持久化到內存 */
    //rdd3.cache() //持久化到內存
    /**
      * 持久化到磁盤
      * DISK_ONLY:持久化到磁盤
      * DISK_ONLY_2:持久化到磁盤而且存一個副本(2個文件)
      * MEMORY_ONLY:持久化到內存
      * MEMORY_ONLY_2:持久化到內存而且存一個副本(2個文件)
      * MEMORY_ONLY_SER:持久化到內存,而且序列化
      * MEMORY_ONLY_SER_2:持久化到內存,而且序列化,還要存一個副本(2個文件)
      * MEMORY_AND_DISK:持久化到內存和磁盤
      * MEMORY_AND_DISK_2:持久化到內存和磁盤而且存一個副本(2個文件)
      * MEMORY_AND_DISK_SER:持久化到內存和磁盤,而且序列化
      * MEMORY_AND_DISK_SER_2:持久化到內存和磁盤,而且序列化,還要存一個副本(2個文件)
      * OFF_HEAP:持久化在堆外內存中,Spark本身管理的內存
      * */
    rdd3.persist(StorageLevel.DISK_ONLY) //持久化到磁盤

    rdd3.collect.foreach(x => print(x + "\t"))
    println("------------")
    //輸出語句不會執行
    rdd3.collect.foreach(x => print(x + "\t"))
  }
}

4.28 checkpoint

持久化只是將數據保存在BlockManager中,而RDD的Lineage是不變的,可是checkpoint執行完後,RDD已經沒有以前所謂的依賴了,而只是一個強行爲其設定的checkpointRDD,RDD的Lineage改變了。

持久化的數據丟失可能性更大,磁盤、內存都有可能會存在數據丟失狀況。可是checkpoint的數據一般是儲存在如HDFS等容錯、高可用的文件系統,數據丟失可能性較小。

默認狀況下,若是某個RDD沒有持久化,可是設置了checkpoint Job想要將RDD的數據寫入文件系統,須要所有從新計算一次,再將計算出來的RDD數據checkpoint到文件系統,因此,建議對checkpoint的RDD使用十九畫,這樣RDD只須要計算一次就能夠了。

object Demo28 {
  def main(args: Array[String]): Unit = {
    //設置當前用戶
    System.setProperty("HADOOP_USER_NAME", "Heaton")
    val conf = new SparkConf().setAppName("checkpoint").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //設置checkpoint目錄
    sc.setCheckpointDir("hdfs://cdh01.cm:8020/test")
    val rdd1: RDD[String] = sc.parallelize(Array("abc"))
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))

    /**
      * 標記RDD2的checkpoint
      * RDD2會被保存到文件中,而且會切斷到父RDD的引用,該持久化操做,必須在job運行以前調用
      * 若是不進行持久化操做,那麼在保存到文件的時候須要從新計算
      **/
    rdd2.cache()
    rdd2.collect.foreach(x => print(x + "\t"))
    rdd2.collect.foreach(x => print(x + "\t"))
  }
}

4.29 累加器

4.29.1 累加器問題拋出

object Demo29 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    var a = 1
    rdd1.foreach(x => {
      a += 1
      println("rdd:  "+a)
    })
    println("-----")
    println("main:  "+a)

    /**
      * rdd:  2
      * rdd:  2
      * rdd:  3
      * rdd:  3
      * rdd:  4
      * -----
      * main:  1
      * */
  }
}

從上面能夠看出,2個問題:

  1. 變量是在RDD分區中進行累加,而且2個RDD分區中的變量不一樣
  2. 最後並無main方法中的變量值改變

考慮到main方法中的a變量是在Driver端,而RDD分區又是在Excutor端進行計算,因此只是拿了一個Driver端的鏡像,並且不一樣步回Driver端

在實際開發中,咱們須要進行這種累加,這時就用到了累加器

4.29.2 累加器案例

Spark提供了一些經常使用累加器,主要針對值類型

object Demo30 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc: util.LongAccumulator = sc.longAccumulator("acc")
    rdd1.foreach(x => {
      acc.add(1)
      println("rdd:  "+acc.value)
    })
    println("-----")
    println("main:  "+acc.count)

    /**
      * rdd:  1
      * rdd:  1
      * rdd:  2
      * rdd:  2
      * rdd:  3
      * -----
      * main:  5
      * */
  }
}

如上代碼,咱們發現累加器是分區內先累加,再分區間累加

4.29.3 自定義累加器

  • 案例一:自定義Int累加器
object Demo31 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Accumulator").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc = new MyAccumulator
    //註冊累加器
    sc.register(acc)

    rdd1.foreach(x => {
      acc.add(1)
      println("rdd:  " + acc.value)
    })
    println("-----")
    println("main:  " + acc.value)

    /**
      * rdd:  1
      * rdd:  1
      * rdd:  2
      * rdd:  3
      * rdd:  2
      * -----
      * main:  5
      **/
  }
}

class MyAccumulator extends AccumulatorV2[Int, Int] {
  var sum: Int = 0

  //判斷累加的值是否是空
  override def isZero: Boolean = sum == 0

  //如何把累加器copy到Executor
  override def copy(): AccumulatorV2[Int, Int] = {
    val accumulator = new MyAccumulator
    accumulator.sum = sum
    accumulator
  }

  //重置值
  override def reset(): Unit = {
    sum = 0
  }

  //分區內的累加
  override def add(v: Int): Unit = {
    sum += v
  }

  //分區間的累加,累加器最終的值
  override def merge(other: AccumulatorV2[Int, Int]): Unit = {
    other match {
      case o: MyAccumulator => this.sum += o.sum
      case _ =>
    }
  }

  override def value: Int = this.sum
}
  • 案例二:自定義map平均值累加器
object Demo32 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5), 2)
    val acc = new MyAccumulator
    //註冊累加器
    sc.register(acc)

    rdd1.foreach(x => {
      acc.add(x)
    })
    println("main:  " + acc.value)

    /**main:  Map(sum -> 15.0, count -> 17.0, avg -> 0.8823529411764706) */
  }
}

class MyAccumulator extends AccumulatorV2[Int, Map[String, Double]] {
  var map: Map[String, Double] = Map[String, Double]()

  //判斷累加的值是否是空
  override def isZero: Boolean = map.isEmpty

  //如何把累加器copy到Executor
  override def copy(): AccumulatorV2[Int, Map[String, Double]] = {
    val accumulator = new MyAccumulator
    accumulator.map ++= map
    accumulator
  }

  //重置值
  override def reset(): Unit = {
    map = Map[String, Double]()
  }

  //分區內的累加
  override def add(v: Int): Unit = {
    map += "sum" -> (map.getOrElse("sum", 0d) + v)
    map += "count" -> (map.getOrElse("sum", 0d) + 1)
  }

  //分區間的累加,累加器最終的值
  override def merge(other: AccumulatorV2[Int, Map[String, Double]]): Unit = {
    other match {
      case o: MyAccumulator =>
        this.map += "sum" -> (map.getOrElse("sum", 0d) + o.map.getOrElse("sum", 0d))
        this.map += "count" -> (map.getOrElse("count", 0d) + o.map.getOrElse("count", 0d))
      case _ =>
    }
  }

  override def value: Map[String, Double] = {
    map += "avg" -> map.getOrElse("sum", 0d) / map.getOrElse("count", 1d)
    map
  }
}

4.30 廣播變量

廣播變量在每一個節點上保存一個只讀的變量的緩存,而不用給每一個task來傳送一個copy

object Demo33 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("partitionBy").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd: RDD[String] = sc.parallelize(Array("a", "b"))
    val broadArr: Broadcast[Array[Int]] = sc.broadcast(Array(1, 2))
    rdd.foreach(x => {
      val value: Array[Int] = broadArr.value
      println(value.toList)
    })
    /**
      * List(1, 2)
      * List(1, 2)
      * */
  }
}

5 Spark SQL

Spark SQL是Spark用於結構化數據處理的Spark模塊。如:Mysql,Hbase,Hive

Spark SQL將SQL轉換成RDD,而後提交到集羣執行,執行效率很是快,並且使只會寫SQL的同窗能夠直接開發

Spark SQL提供了2個編程抽象,等同於Spark Core中的RDD,分別是:DataFrame,DataSet

5.1 DataFrame

與RDD相似,DataFrame是一個分佈式的數據容器

DataFrame更像是傳統數據庫的二維表格,除了數據之外,還記錄了數據的結構信息(Schema)

與Hive相似,DataFrame也支持嵌套數據類型(Struct、Array、Map)

  • 底層架構

  • Predicate Pushdown 機制

5.2 DataSet

DataSet是DataFrame的一個擴展,是SparkSQL1.6後新增的數據抽象,API友好

scala樣例類支持很是好,用樣例類在DataSet中定義數據結構信息,樣例類中每一個屬性的沒成直接映射到DataSet中的字段名稱。

DataFrame是DataSet的特例,DataFrame=DataSet[Row],能夠經過as方法將DataFrame轉換成DataSet,Row是一個類型,能夠是Person、Animal,全部的表結構信息都用Row來表示

DataFrame只知道字段,不知道字段類型,而DataSet不只知道字段,還知道類型。

DataSet具備強類型的數據集合,須要提供對應的類型信息。

5.3 SparkSession

從Spark2.0開始,SparkSession是Spark新的查詢起始點,其內部封裝了SparkContext,因此計算其實是由SparkContext完成

5.4 DataFrame編程

5.4.1 解析Json數據

  • 讀取Json文件

在idea中,resources目錄下建立student.json文件

{"id":1,"name": "zhangsa", "age": 10}
{"id":2,"name": "lisi", "age": 20}
{"id":3,"name": "wangwu", "age": 30}
{"id":4,"name": "zhaoliu", "age": 12}
{"id":5,"name": "hahaqi", "age": 24}
{"id":6,"name": "xixiba", "age": 33}
object SparkSQLDemo1 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder().appName("demo1").master("local[*]") getOrCreate()
    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.show(100)
    /**
      * +---+---+-------+
      * |age| id|   name|
      * +---+---+-------+
      * | 10|  1|zhangsa|
      * | 20|  2|   lisi|
      * | 30|  3| wangwu|
      * | 12|  4|zhaoliu|
      * | 24|  5| hahaqi|
      * | 33|  6| xixiba|
      * +---+---+-------+
      */
    println(frame.schema)
    /**
      * StructType(StructField(age,LongType,true), StructField(id,LongType,true), StructField(name,StringType,true))
      */
  }
}

5.4.2 TempView

  • 在使用sql查詢以前須要註冊臨時視圖
    • createTempView():註冊視圖,當前Session有效
    • createOrReplaceTempView():註冊視圖,當前Session有效,若是已經存在,那麼替換
    • createGlobalTempView():註冊全局視圖,在全部Session中生效
    • createOrReplaceGlobalTempView():註冊全局視圖,在全部Session中生效,若是已經存在,那麼替換

使用全局視圖,須要在表名前添加global_tmp,如student表,寫法爲:global_tmp.student

object SparkSQLDemo2 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder().appName("demo2").master("local[*]") getOrCreate()
    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)

    frame.createOrReplaceTempView("student")
    val result: DataFrame = spark.sql("select * from student where age >= 20")
    result.show()
    /**
      * +---+---+------+
      * |age| id|  name|
      * +---+---+------+
      * | 20|  2|  lisi|
      * | 30|  3|wangwu|
      * | 24|  5|hahaqi|
      * | 33|  6|xixiba|
      * +---+---+------+
      */
  }
}

5.5 DataSet編程

  • DataSet簡單使用
object SparkSQLDemo3 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder().appName("demo3").master("local[*]") getOrCreate()
    import spark.implicits._
    val sRDD: Dataset[Student] = Seq(Student(1,"zhangsan",15),Student(2,"lisi",16)).toDS
    sRDD.foreach(s=>{
      println(s.name+":"+s.age)
    })
    /**
      * zhangsan:15
      * lisi:16
      * */
  }
}

case class Student(id: Long, name: String, age: Long)

5.6 DataSet和DataFrame和RDD互相轉換

涉及到RDD,DataFrame,DataSet之間操做時,須要隱式轉換導入: import spark.implicits._ 這裏的spark不是報名,而是表明了SparkSession的那個對象名,因此必須先建立SparkSession對象在導入

RDD轉DF:toDF

RDD轉DS:toDS

DF轉RDD:rdd

DS轉RDD:rdd

DS轉DF:toDF

DF轉DS:as

  • 建立student.csv文件
1,zhangsa,10
2,lisi,20
3,wangwu,30
object SparkSQLDemo4 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder().appName("demo4").master("local[*]") getOrCreate()
    import spark.implicits._

    val rdd: RDD[String] = spark.sparkContext.textFile(this.getClass.getClassLoader.getResource("student.csv").getPath)
    val studentRDD: RDD[Student] = rdd.map(x => {
      val arr: Array[String] = x.split(",")
      Student(arr(0).toLong, arr(1), arr(2).toLong)
    })
    /** 1. RDD轉DF
      * +---+-------+---+
      * | id|   name|age|
      * +---+-------+---+
      * |  1|zhangsa| 10|
      * |  2|   lisi| 20|
      * |  3| wangwu| 30|
      * +---+-------+---+
      * */
    val df1: DataFrame = studentRDD.toDF()
    df1.show()
    /** 2. RDD轉DS
      * +---+-------+---+
      * | id|   name|age|
      * +---+-------+---+
      * |  1|zhangsa| 10|
      * |  2|   lisi| 20|
      * |  3| wangwu| 30|
      * +---+-------+---+
      * */
    val ds1: Dataset[Student] = studentRDD.toDS()
    ds1.show()

    /** 3. DF轉RDD
      * List([1,zhangsa,10], [2,lisi,20], [3,wangwu,30])
      * */
    val rdd1: RDD[Row] = df1.rdd
    println(rdd1.collect.toList)

    /** 4. DS轉RDD
      * List(Student(1,zhangsa,10), Student(2,lisi,20), Student(3,wangwu,30))
      * */
    val rdd2: RDD[Student] = ds1.rdd
    println(rdd2.collect.toList)

    /** 5. DS轉DF
      * +---+-------+---+
      * | id|   name|age|
      * +---+-------+---+
      * |  1|zhangsa| 10|
      * |  2|   lisi| 20|
      * |  3| wangwu| 30|
      * +---+-------+---+
      * */
    val df2: DataFrame = ds1.toDF()
    df2.show()

    /** 6. DF轉DS
      * +---+-------+---+
      * | id|   name|age|
      * +---+-------+---+
      * |  1|zhangsa| 10|
      * |  2|   lisi| 20|
      * |  3| wangwu| 30|
      * +---+-------+---+
      * */
    val ds2: Dataset[Student] = df2.as[Student]
    ds2.show()
  }
}

case class Student(id: Long, name: String, age: Long)

5.7 UDF函數:一對一

object SparkSQLDemo5 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder().appName("demo5").master("local[*]") getOrCreate()
    //註冊函數 
    val toUpper: UserDefinedFunction = spark.udf.register("toUpper", (s: String) => s.toUpperCase)

    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.createOrReplaceTempView("student")

    val result: DataFrame = spark.sql("select id,toUpper(name),age from student where age >= 20")
    result.show()

    /**
      * +---+-----------------+---+
      * | id|UDF:toUpper(name)|age|
      * +---+-----------------+---+
      * |  2|             LISI| 20|
      * |  3|           WANGWU| 30|
      * |  5|           HAHAQI| 24|
      * |  6|           XIXIBA| 33|
      * +---+-----------------+---+
      **/
  }
}

5.8 UDAF函數:多對一

object SparkSQLDemo6 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder().appName("demo6").master("local[*]") getOrCreate()
    //註冊函數
    spark.udf.register("MyAvg", new MyAvg)

    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.createOrReplaceTempView("student")
    frame.printSchema()
    val result: DataFrame = spark.sql("select sum(age),count(1),MyAvg(age) from student")
    result.show()

    /**
      * +--------+--------+----------+
      * |sum(age)|count(1)|myavg(age)|
      * +--------+--------+----------+
      * |     129|       6|      21.5|
      * +--------+--------+----------+
      * */
  }
}

class MyAvg extends UserDefinedAggregateFunction {
  //輸入數據類型
  override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil)

  //緩衝區中值的類型
  override def bufferSchema: StructType = StructType(StructField("sum", DoubleType) :: StructField("count", LongType) :: Nil)

  //最終輸出數據類型
  override def dataType: DataType = DoubleType

  //輸入和輸出之間的肯定性,通常都是true
  override def deterministic: Boolean = true

  //緩衝區中值的初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //sum
    buffer(0) = 0.0d
    //count
    buffer(1) = 0L
  }

  //分區內聚合
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    //若是值不爲空
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getDouble(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }

  //分區間聚合
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //若是值不爲空
    if (!buffer2.isNullAt(0)) {
      buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
      buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }
  }

  //最終輸出的值
  override def evaluate(buffer: Row): Any = {
    new DecimalFormat(".00").format(buffer.getDouble(0) / buffer.getLong(1)).toDouble
  }
}

5.9 UDTF函數:一對多

須要使用Hive的UDTF

import java.util.ArrayList
import org.apache.hadoop.hive.ql.exec.{UDFArgumentException, UDFArgumentLengthException}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, StructObjectInspector}
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSQLDemo7 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder()
      .appName("demo7")
      .master("local[*]")
      .enableHiveSupport() //啓用hive
      .getOrCreate()

    import spark.implicits._

    //註冊utdf算子,這裏沒法使用sparkSession.udf.register(),注意包全路徑
    spark.sql("CREATE TEMPORARY FUNCTION MySplit as 'com.xx.xx.MySplit'")


    val frame: DataFrame = spark.sparkContext.parallelize(Array("a,b,c,d")).toDF("word")
    frame.createOrReplaceTempView("test")
    val result: DataFrame = spark.sql("select MySplit(word,',') from test")
    result.show()

    /**
      * +----+
      * |col1|
      * +----+
      * |   a|
      * |   b|
      * |   c|
      * |   d|
      * +----+
      */
  }
}

class MySplit extends GenericUDTF {

  override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {
    if (args.length != 2) {
      throw new UDFArgumentLengthException("UserDefinedUDTF takes only two argument")
    }
    if (args(0).getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentException("UserDefinedUDTF takes string as a parameter")
    }

    //列名,會被用戶傳遞的覆蓋
    val fieldNames: ArrayList[String] = new ArrayList[String]()
    fieldNames.add("col1")

    //返回列以什麼格式輸出,這裏是string,添加幾個就是幾個列,和上面的名字個數對應個數。
    var fieldOIs: ArrayList[ObjectInspector] = new ArrayList[ObjectInspector]()
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)

    ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs)
  }

  override def process(objects: Array[AnyRef]): Unit = {
    //獲取數據
    val data: String = objects(0).toString
    //獲取分隔符
    val splitKey: String = objects(1).toString()
    //切分數據
    val words: Array[String] = data.split(splitKey)

    //遍歷寫出
    words.foreach(x => {
      //將數據放入集合
      var tmp: Array[String] = new Array[String](1)
      tmp(0) = x
      //寫出數據到緩衝區
      forward(tmp)
    })
  }

  override def close(): Unit = {
    //沒有流操做
  }
}

5.10 讀取Json數據拓展

  • 讀取嵌套json數據
{"name":"zhangsan","score":100,"infos":{"age":30,"gender":"man"}},
{"name":"lisi","score":66,"infos":{"age":28,"gender":"feman"}},
{"name":"wangwu","score":77,"infos":{"age":15,"gender":"feman"}}
object SparkSQLDemo8 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder()
      .appName("demo8")
      .master("local[*]")
      .getOrCreate()

    //讀取嵌套的json文件
    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.createOrReplaceTempView("infosView")
    spark.sql("select name,infos.age,score,infos.gender from infosView").show(100)

    /**
      * +--------+---+-----+------+
      * |    name|age|score|gender|
      * +--------+---+-----+------+
      * |zhangsan| 30|  100|   man|
      * |    lisi| 28|   66| feman|
      * |  wangwu| 15|   77| feman|
      * +--------+---+-----+------+
      **/
  }
}
  • 讀取嵌套jsonArray數據
{"name":"zhangsan","age":18,"scores":[{"yuwen":98,"shuxue":90,"yingyu":100,"xueqi":1},{"yuwen":77,"shuxue":33,"yingyu":55,"xueqi":2}]},
{"name":"lisi","age":19,"scores":[{"yuwen":58,"shuxue":50,"yingyu":78,"xueqi":1},{"yuwen":66,"shuxue":88,"yingyu":66,"xueqi":2}]},
{"name":"wangwu","age":17,"scores":[{"yuwen":18,"shuxue":90,"yingyu":45,"xueqi":1},{"yuwen":88,"shuxue":77,"yingyu":44,"xueqi":2}]},
{"name":"zhaoliu","age":20,"scores":[{"yuwen":68,"shuxue":23,"yingyu":63,"xueqi":1},{"yuwen":44,"shuxue":55,"yingyu":77,"xueqi":2}]},
{"name":"tianqi","age":22,"scores":[{"yuwen":88,"shuxue":91,"yingyu":41,"xueqi":1},{"yuwen":55,"shuxue":66,"yingyu":88,"xueqi":2}]}
object SparkSQLDemo8 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder()
      .appName("demo8")
      .master("local[*]")
      .getOrCreate()

    //讀取嵌套的json文件
    val frame: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("student.json").getPath)
    frame.createOrReplaceTempView("infosView")
    spark.sql("select name,age,explode(scores) from infosView")
    //不折疊顯示
    frame.show(false)

    /**
      * +---+--------+-----------------------------------+
      * |age|name    |scores                             |
      * +---+--------+-----------------------------------+
      * |18 |zhangsan|[[90, 1, 100, 98], [33, 2, 55, 77]]|
      * |19 |lisi    |[[50, 1, 78, 58], [88, 2, 66, 66]] |
      * |17 |wangwu  |[[90, 1, 45, 18], [77, 2, 44, 88]] |
      * |20 |zhaoliu |[[23, 1, 63, 68], [55, 2, 77, 44]] |
      * |22 |tianqi  |[[91, 1, 41, 88], [66, 2, 88, 55]] |
      * +---+--------+-----------------------------------+
      */
  }
}

5.11 讀取Mysql數據

  • 使用Mysql
create database spark;
use spark;
create table person(id varchar(12),name varchar(12),age int(10));
insert into person values('1','zhangsan',18),('2','lisi',19),('3','wangwu',20);
object SparkSQLDemo9 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder()
      .appName("demo9")
      .master("local[*]")
      .getOrCreate()

    val frame: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/spark")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "root")
      .option("dbtable", "person")
      .load()

    frame.show()

    /**
      * +---+--------+---+
      * | id|    name|age|
      * +---+--------+---+
      * |  1|zhangsan| 18|
      * |  2|    lisi| 19|
      * |  3|  wangwu| 20|
      * +---+--------+---+
      */
  }
}

5.12 讀取Hive數據

  • 使用Hive
//建立數據庫
CREATE DATABASE dwd
//建立表
CREATE EXTERNAL TABLE `dwd.student`(
  `ID` bigint COMMENT '',
  `CreatedBy` string COMMENT '建立人',
  `CreatedTime` string COMMENT '建立時間',
  `UpdatedBy`  string COMMENT '更新人',
  `UpdatedTime` string COMMENT '更新時間',
  `Version` int COMMENT '版本號',
  `name` string COMMENT '姓名'
  ) COMMENT '學生表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/test/student/'
tblproperties ("parquet.compression"="snappy")
//添加數據
INSERT INTO TABLE dwd.student partition(dt='2020-04-05') VALUES(1,"heaton","2020-04-05","","","1","zhangsan") 
INSERT INTO TABLE dwd.student partition(dt='2020-04-06') VALUES(2,"heaton","2020-04-06","","","1","lisi")
  • 將服務端配置hive-site.xml,放入resources路徑
object SparkSQLDemo10 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder()
      .appName("demo10")
      .master("local[*]")
      .enableHiveSupport() //啓用hive
      .getOrCreate()

    spark.sql("select * from dwd.student").show()
    
    /**
      * +---+---------+-----------+---------+-----------+-------+--------+----------+
      * | id|createdby|createdtime|updatedby|updatedtime|version|    name|        dt|
      * +---+---------+-----------+---------+-----------+-------+--------+----------+
      * |  1|   heaton| 2020-04-05|         |           |      1|zhangsan|2020-04-05|
      * |  2|   heaton| 2020-04-06|         |           |      1|    lisi|2020-04-06|
      * +---+---------+-----------+---------+-----------+-------+--------+----------+
      */
  }
}

5.13 讀取Hbase數據

object SparkSQLDemo11 {
  def main(args: Array[String]): Unit = {
    //建立sparksession
    val spark = SparkSession.builder()
      .appName("demo11")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    val hconf: Configuration = HBaseConfiguration.create
    hconf.set(HConstants.ZOOKEEPER_QUORUM, "cdh01.cm,cdh02.cm,cdh03.cm")
    hconf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
    //必定要建立這個hbaseContext, 由於後面寫入時會用到它,否則空指針
    val hBaseContext = new HBaseContext(spark.sparkContext, hconf)

    //構建DataSet
    val ds1: Dataset[HBaseRecord] = spark.sparkContext.parallelize(1.to(256)).map(i => new HBaseRecord(i, "Hbase")).toDS()

    //定義映射的catalog
    val catalog: String = "{" +
      "       \"table\":{\"namespace\":\"default\", \"name\":\"test1\"}," +
      "       \"rowkey\":\"key\"," +
      "       \"columns\":{" +
      "         \"f0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
      "         \"f1\":{\"cf\":\"cf1\", \"col\":\"f1\", \"type\":\"boolean\"}," +
      "         \"f2\":{\"cf\":\"cf2\", \"col\":\"f2\", \"type\":\"double\"}," +
      "         \"f3\":{\"cf\":\"cf3\", \"col\":\"f3\", \"type\":\"float\"}," +
      "         \"f4\":{\"cf\":\"cf4\", \"col\":\"f4\", \"type\":\"int\"}," +
      "         \"f5\":{\"cf\":\"cf5\", \"col\":\"f4\", \"type\":\"bigint\"}," +
      "         \"f6\":{\"cf\":\"cf6\", \"col\":\"f6\", \"type\":\"smallint\"}," +
      "         \"f7\":{\"cf\":\"cf7\", \"col\":\"f7\", \"type\":\"string\"}," +
      "         \"f8\":{\"cf\":\"cf8\", \"col\":\"f8\", \"type\":\"tinyint\"}" +
      "       }" +
      "     }"

    //數據寫入Hbase
    ds1.write
      .format("org.apache.hadoop.hbase.spark")
      .option(HBaseTableCatalog.tableCatalog, catalog)
      .option(HBaseTableCatalog.newTable, 5)
      .mode(SaveMode.Overwrite) //寫入5個分區
      .save()

    //讀取Hbase數據
    val ds2: DataFrame = spark.read
      .format("org.apache.hadoop.hbase.spark")
      .option(HBaseTableCatalog.tableCatalog, catalog)
      .load()
    ds2.show(10)

    /**
      * +------------+-----+---+---+------------+-----+-----+---+---+
      * |          f7|   f1| f4| f6|          f0|   f3|   f2| f5| f8|
      * +------------+-----+---+---+------------+-----+-----+---+---+
      * |String:Hbase| true|100|100|row100:Hbase|100.0|100.0|100|100|
      * |String:Hbase|false|101|101|row101:Hbase|101.0|101.0|101|101|
      * |String:Hbase| true|102|102|row102:Hbase|102.0|102.0|102|102|
      * |String:Hbase|false|103|103|row103:Hbase|103.0|103.0|103|103|
      * |String:Hbase| true|104|104|row104:Hbase|104.0|104.0|104|104|
      * |String:Hbase|false|105|105|row105:Hbase|105.0|105.0|105|105|
      * |String:Hbase| true|106|106|row106:Hbase|106.0|106.0|106|106|
      * |String:Hbase|false|107|107|row107:Hbase|107.0|107.0|107|107|
      * |String:Hbase| true|108|108|row108:Hbase|108.0|108.0|108|108|
      * |String:Hbase|false|109|109|row109:Hbase|109.0|109.0|109|109|
      * +------------+-----+---+---+------------+-----+-----+---+---+
      */
  }
}

case class HBaseRecord(f0: String, f1: Boolean, f2: Double, f3: Float, f4: Int, f5: Long, f6: Short, f7: String, f8: Byte) {
  def this(i: Int, s: String) {
    this(s"row$i:$s", i % 2 == 0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s"String:$s", i.toByte)
  }
}

6 Spark Streaming

Spark Streaming是Spark核心API擴展,用於構建彈性、高吞吐、容錯的在線數據流的流式處理程序

數據來源有多種:Kafla、Flume、TCP等

Spark Streaming中提供的高級抽象:Discretized stream,DStream表示一個連續的數據流,能夠由來自數據源的輸入數據流來建立,也能夠經過在其餘DStream上轉換獲得,一個DStream是由一個RDD序列來表示的,對DStream的操做都會轉換成對其裏面的RDD的操做

  • 執行流程

Receiver task 是 7*24h 一直在執行,一直接收數據,將接收到的數據保存到 batch 中,假設 batch interval 爲 5s,
那麼把接收到的數據每隔 5s 切割到一個 batch,由於 batch 是沒有分佈式計算的特性的,而 RDD 有,
因此把 batch 封裝到 RDD 中,又把 RDD 封裝到DStream 中進行計算,在第 5s 的時候,計算前 5s 的數據,
假設計算 5s 的數據只須要 3s,那麼第 5-8s 一邊計算任務,一邊接收數據,第 9-11s 只是接收數據,而後在第 10s 的時
候,循環上面的操做。若是 job 執行時間大於 batch interval,那麼未執行的數據會越攢越多,最終致使 Spark集羣崩潰。

注意:Receiver (接收器)在新版本中已經去除了。

6.1 端口監聽案例

object SparkStreamingDemo1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo1").setMaster("local[*]")
    //建立一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))

    //監控cdh01.cm上11111端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("cdh01.cm", 11111)
    val words: DStream[(String, Int)] = lines.flatMap(_.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    //行動算子打印
    words.print()
    
    //啓動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 監聽服務器,間隔10秒發送數據測試以下
nc -lk 11111

6.2 對接Kafka

生產中這種是最經常使用的方式

object SparkStreamingDemo2 {
  def main(args: Array[String]): Unit = {
    val brokers = "cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092"
    val topic = "bigdata"
    val cgroup = "test"
    val params: Map[String, Object] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> cgroup,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val conf: SparkConf = new SparkConf().setAppName("demo2").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")
    //建立一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))

    //Streaming對接kafka
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](List(topic), params)
    )

    kafkaDStream.print

    //啓動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 使用Kafka
kafka-console-producer --broker-list cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 --topic bigdata

ConsumerRecord(topic = bigdata, partition = 0, offset = 13, CreateTime = 1587194334601, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = a)
ConsumerRecord(topic = bigdata, partition = 0, offset = 14, CreateTime = 1587194335215, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = b)
ConsumerRecord(topic = bigdata, partition = 0, offset = 15, CreateTime = 1587194335975, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = c)
ConsumerRecord(topic = bigdata, partition = 0, offset = 16, CreateTime = 1587194336887, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
ConsumerRecord(topic = bigdata, partition = 0, offset = 17, CreateTime = 1587194337912, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = e)

6.3 Checkpoint

Spark的一種持久化方式,並不推薦

這種方式很容易作到,可是有如下的缺點:
屢次輸出,結果必須知足冪等性
事務性不可選
若是代碼變動不能從Checkpoint恢復,不過你能夠同時運行新任務和舊任務,由於輸出結果具備等冪性

object SparkStreamingDemo3 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo3").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", createSSC)

    //啓動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }

  def createSSC() : StreamingContext = {
    val brokers = "cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092"
    val topic = "bigdata"
    val cgroup = "test"
    val params: Map[String, Object] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> cgroup,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val conf: SparkConf = new SparkConf().setAppName("demo2").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")
    //建立一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))

    //設置檢查點
    ssc.checkpoint("./ck")

    //Streaming對接kafka
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](List(topic), params)
    )
    kafkaDStream.print

    ssc
  }
}
  • 使用Kafka
kafka-console-producer --broker-list cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 --topic bigdata

ConsumerRecord(topic = bigdata, partition = 0, offset = 18, CreateTime = 1587195534875, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1)
ConsumerRecord(topic = bigdata, partition = 0, offset = 19, CreateTime = 1587195535127, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 2)
ConsumerRecord(topic = bigdata, partition = 0, offset = 20, CreateTime = 1587195535439, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 3)
ConsumerRecord(topic = bigdata, partition = 0, offset = 21, CreateTime = 1587195535903, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 4)

  • 將程序關閉,在Kafka中繼續寫入數據,在啓動程序

ConsumerRecord(topic = bigdata, partition = 0, offset = 22, CreateTime = 1587195646015, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 5)
ConsumerRecord(topic = bigdata, partition = 0, offset = 23, CreateTime = 1587195646639, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 6)
ConsumerRecord(topic = bigdata, partition = 0, offset = 24, CreateTime = 1587195647207, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 7)
ConsumerRecord(topic = bigdata, partition = 0, offset = 25, CreateTime = 1587195647647, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 8)

6.4 轉換算子

Transformation 含義
map(func) 經過函數func傳遞源DStream的每一個元素,返回一個新的DStream。
flatMap(func) 相似於map,可是每一個輸入項能夠映射到0或多個輸出項。
filter(func) 經過只選擇func返回true的源DStream的記錄來返回一個新的DStream。
repartition(numPartitions) 重分區,經過建立或多或少的分區來更改此DStream中的並行度級別。
union(otherStream) 返回一個新的DStream,它包含源DStream和其餘DStream中的元素的聯合。
count() 經過計算源DStream的每一個RDD中的元素數量,返回一個新的單元素RDD DStream。
reduce(func) 使用func函數(函數接受兩個參數並返回一個參數)聚合源DStream的每一個RDD中的元素,從而返回單元素RDDs的新DStream。這個函數應該是結合律和交換律的,這樣才能並行計算。
countByValue() 當對K類型的元素的DStream調用時,返回一個新的(K, Long)對的DStream,其中每一個鍵的值是它在源DStream的每一個RDD中的頻率。
reduceByKey(func, [numTasks]) 當對(K, V)對的DStream調用時,返回一個新的(K, V)對的DStream,其中每一個鍵的值使用給定的reduce函數進行聚合。注意:默認狀況下,這將使用Spark的默認並行任務數量(本地模式爲2,在集羣模式下,該數量由config屬性Spark .default.parallelism決定)來進行分組。咱們能夠傳遞一個可選的numTasks參數來設置不一樣數量的任務。
join(otherStream, [numTasks]) 當調用兩個(K, V)和(K, W)對的DStream時,返回一個新的(K, (V, W))對的DStream,其中包含每一個Key的全部元素對。
cogroup(otherStream, [numTasks]) 當調用(K, V)和(K, W)對的DStream時,返回一個新的(K, Seq[V], Seq[W])元組DStream。
transform(func) 經過將RDD-to-RDD函數應用於源DStream的每一個RDD,返回一個新的DStream。它能夠用於應用DStream API中沒有公開的任何RDD操做。例如將數據流中的每一個批處理與另外一個數據集鏈接的功能並不直接在DStream API中公開。可是你能夠很容易地使用transform來實現這一點。這帶來了很是強大的可能性。例如,能夠經過將輸入數據流與預先計算的垃圾信息(也多是使用Spark生成的)結合起來進行實時數據清理
updateStateByKey(func) 返回一個新的「state」DStream,其中每一個Key的狀態經過將給定的函數應用於Key的前一個狀態和Key的新值來更新。這能夠用於維護每一個Key的任意狀態數據。要使用它,您須要執行兩個步驟:(1).定義狀態——狀態能夠是任意數據類型;(2).定義狀態更新函數——用函數指定如何使用輸入流中的前一個狀態和新值更新狀態。
object SparkStreamingDemo4 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo4").setMaster("local[*]")
    //建立一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))

    //監控cdh01.cm上11111端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("cdh01.cm", 11111)

    //轉換成RDD操做
    val words: DStream[(String, Int)] = lines.transform(rdd => {
      rdd.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    })

    //行動算子打印
    words.print()

    //啓動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 監聽服務器,間隔10秒發送數據測試以下
nc -lk 11111

6.5 行動算子

Output Operation 含義
print() 在運行流應用程序的驅動程序節點上打印DStream中每批數據的前10個元素。這對於開發和調試很是有用。這在Python API中稱爲pprint()。
saveAsTextFiles(prefix, [suffix]) 將此DStream的內容保存爲文本文件。每一個批處理間隔的文件名是根據前綴和後綴生成的:「prefix- time_in_ms [.suffix]」。
saveAsObjectFiles(prefix, [suffix]) 將此DStream的內容保存爲序列化Java對象的sequencefile。每一個批處理間隔的文件名是根據前綴和後綴生成的:「prefix- time_in_ms [.suffix]」。這在Python API中是不可用的。
saveAsHadoopFiles(prefix, [suffix]) 將這個DStream的內容保存爲Hadoop文件。每一個批處理間隔的文件名是根據前綴和後綴生成的:「prefix- time_in_ms [.suffix]」。這在Python API中是不可用的。
foreachRDD(func) 對流生成的每一個RDD應用函數func的最通用輸出操做符。這個函數應該將每一個RDD中的數據推送到外部系統,例如將RDD保存到文件中,或者經過網絡將其寫入數據庫。請注意,函數func是在運行流應用程序的驅動程序進程中執行的,其中一般會有RDD操做,這將強制流RDDs的計算。在func中建立遠程鏈接時能夠使用foreachPartition 替換foreach操做以下降系統的整體吞吐量

6.6 有狀態轉換

使用updateStateByKey配合檢查點,能夠作到從頭開始保存數據。

object SparkStreamingDemo5 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo5").setMaster("local[*]")
    //建立一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
    //使用updateStateByKey必須設置檢查點
    ssc.checkpoint("./ck")

    //監控cdh01.cm上11111端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("cdh01.cm", 11111)

    def f(seq: Seq[Int], opt: Option[Int]): Some[Int] = {
      Some(seq.sum + opt.getOrElse(0)
      )
    }

    //使用updateStateByKey,根據Key保存前面接收序列裏的數據爲一個序列
    val words: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(f)

    //行動算子打印
    words.print()

    //啓動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 監聽服務器,間隔10秒發送數據測試以下
nc -lk 11111

(aa,1)
(dd,1)
(bb,1)
(cc,1)

  • 間隔10秒後

(aa,3)
(dd,1)
(bb,1)
(cc,3)

6.7 窗口函數

窗口計算,容許你在滑動的數據窗口上應用轉換。

每當窗口滑過源DStream時,屬於該窗口的源RDDs就被組合起來並對其進行操做,從而生成窗口化DStream的RDDs。

上圖中操做應用於最後3個時間單位的數據,並以2個時間單位進行移動。這代表任何窗口操做都須要指定兩個參數:

窗口長度(windowLength)——窗口的持續時間

滑動間隔(slideInterval)——執行窗口操做的間隔

這兩個參數必須是批處理間隔的倍數

Transformation 含義
window(windowLength, slideInterval) 返回一個新的DStream,它是基於源DStream的窗口批次計算的。
countByWindow(windowLength, slideInterval) 返回流中元素的滑動窗口計數。
reduceByWindow(func, windowLength, slideInterval) 返回一個新的單元素流,該流是使用func在滑動間隔上聚合流中的元素建立的。這個函數應該是結合律和交換律的,這樣才能並行地正確計算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 當對(K, V)對的DStream調用時,返回一個新的(K, V)對的DStream,其中每一個Key的值使用給定的reduce函數func在滑動窗口中分批聚合。注意:默認狀況下,這將使用Spark的默認並行任務數量(本地模式爲2,在集羣模式下,該數量由config屬性Spark .default.parallelism決定)來進行分組。您能夠傳遞一個可選的numTasks參數來設置不一樣數量的任務。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上面reduceByKeyAndWindow()的一個更有效的版本,其中每一個窗口的reduce值是使用前一個窗口的reduce值增量計算的。這是經過減小進入滑動窗口的新數據和「反向減小」離開窗口的舊數據來實現的。例如,在窗口滑動時「添加」和「減去」鍵的計數。可是,它只適用於「可逆約簡函數」,即具備相應「逆約簡」函數的約簡函數(取invFunc參數)。與reduceByKeyAndWindow相似,reduce任務的數量能夠經過一個可選參數進行配置。注意,必須啓用checkpoint才能使用此操做。
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當對(K, V)對的DStream調用時,返回一個新的(K, Long)對的DStream,其中每一個Key的值是它在滑動窗口中的頻率。與reduceByKeyAndWindow相似,reduce任務的數量能夠經過一個可選參數進行配置。
object SparkStreamingDemo6 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demo6").setMaster("local[*]")
    //建立一個10秒封裝一次數據的StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))

    //監控cdh01.cm上11111端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("cdh01.cm", 11111)
    //    Duration.of(10,TimeUnit.SECONDS)

    //使用窗口,比封裝數據時間多一倍,意思是至關於包含兩個窗口,滑動間隔爲一個窗口
    val words: DStream[(String, Int)] = lines.window(Seconds(10), Seconds(5)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    //行動算子打印
    words.print()

    //啓動StreamingContext並等待終止
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 監聽服務器,間隔10秒發送數據測試以下
nc -lk 11111

每間隔5秒輸如一行,結果集以下

(aa,1)
(bb,1)
(cc,1)


(aa,2)
(bb,2)
(cc,1)


(aa,2)
(bb,1)
(cc,1)

  • 圖解

7 Spark內存管理

7.1 堆內和堆外內存

做爲一個JVM進程,Executor的內存管理創建在JVM的內存管理之上,Spark對JVM的堆內(On-head)空間進行了更爲詳細的分配,以充分利用內存。同時,Spark引入了堆外(Off-head)內存,使之能夠直接在工做節點的系統內存中開闢空間,進一步優化了內存的使用。堆內內存受到JVM統一管理,堆外內存式直接向操做系統進行內存的申請和釋放。

  • 堆內內存

堆內內存的大小,由Spark應用程序啓動時的 executor-memoryspark.executor.memory參數配置,Executor內運行的併發任務共享JVM堆內內存,這些任務在緩存RDD數據和廣播(Broadcast)數據時佔用的內存被規劃爲儲存(Storage)內存,而這些任務在執行Shuffle時佔用的內存被規劃委執行(Executor)內存,剩餘的部分不作特殊規劃,那些Spark內部的對象實例,或者用戶定義的Spark應用程序中的對象實例,均佔用剩餘的空間,不一樣的管理模式下,這三部分佔用的空間大小各部相同

Spark對堆內內存的管理是一種邏輯上的「規劃式」管理,由於對象實際佔用內存的申請和釋放都是由JVM完成,Spark只能在申請後和釋放前記錄這些內存

申請內存流程以下:

  1. Spark記錄該對象釋放的內存,刪除該對象的引用
  2. 等待JVM的垃圾回收機制釋放該對象佔用的堆內內存

JVM的對象能夠序列化的方式儲存,序列化的過程是將對象轉換成爲二進制字節流,本質上能夠理解爲將非連續空間的鏈式儲存轉化爲連續空間或塊式儲存,在訪問時則須要進行序列化的逆過程--反序列化,將字節流轉化成對象,序列化的方式能夠節省存儲空間,但增長了內存的讀取時候的計算開銷

對於 Spark 中序列化的對象,因爲是字節流的形式,其佔用的內存大小可直接計算,而對於非序列化的對象,其佔用的內存是經過週期性地採樣近似估算而得,即並非每次新增的數據項都會計算一次佔用的內存大小,這種方法下降了時間開銷可是有可能偏差較大,致使某一時刻的實際內存有可能遠遠超出預期[2]。此外,在被 Spark 標記爲釋放的對象實例,頗有可能在實際上並無被 JVM 回收,致使實際可用的內存小於 Spark 記錄的可用內存。因此 Spark 並不能準確記錄實際可用的堆內內存,從而也就沒法徹底避免內存溢出(OOM, Out of Memory)的異常。

雖然不能精準控制堆內內存的申請和釋放,但 Spark 經過對存儲內存和執行內存各自獨立的規劃管理,能夠決定是否要在存儲內存裏緩存新的 RDD,以及是否爲新的任務分配執行內存,在必定程度上能夠提高內存的利用率,減小異常的出現。

  • 堆外內存

爲了進一步優化內存的使用以及提升 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,存儲通過序列化的二進制數據。利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時再也不基於 Tachyon,而是與堆外的執行內存同樣,基於 JDK Unsafe API 實現[3]),Spark 能夠直接操做系統堆外內存,減小了沒必要要的內存開銷,以及頻繁的 GC 掃描和回收,提高了處理性能。堆外內存能夠被精確地申請和釋放,並且序列化的數據佔用的空間能夠被精確計算,因此相比堆內內存來講下降了管理的難度,也下降了偏差。

在默認狀況下堆外內存並不啓用,可經過配置 spark.memory.offHeap.enabled 參數啓用,並由 spark.memory.offHeap.size 參數設定堆外空間的大小。除了沒有 other 空間,堆外內存與堆內內存的劃分方式相同,全部運行中的併發任務共享存儲內存和執行內存。

7.2 內存空間管理

7.2.1 靜態內存管理

在 Spark 最初採用的靜態內存管理機制下,存儲內存、執行內存和其餘內存的大小在 Spark 應用程序運行期間均爲固定的,但用戶能夠應用程序啓動前進行配置

  • 靜態內存管理圖-堆內

可用的堆內內存的大小須要按照下面的方式計算

  1. 可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
  2. 可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

其中 systemMaxMemory 取決於當前 JVM 堆內內存的大小,最後可用的執行內存或者存儲內存要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。上述計算公式中的兩個 safetyFraction 參數,其意義在於在邏輯上預留出 1-safetyFraction 這麼一塊保險區域,下降因實際內存超出當前預設範圍而致使 OOM 的風險(上文提到,對於非序列化對象的內存採樣估算會產生偏差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時 Spark 並無區別對待,和「其它內存」同樣交給了 JVM 去管理。

堆外的空間分配較爲簡單,只有存儲內存和執行內存,如圖所示。可用的執行內存和存儲內存佔用的空間大小直接由參數 spark.memory.storageFraction 決定,因爲堆外內存佔用的空間能夠被精確計算,因此無需再設定保險區域。

  • 靜態內存管理圖- 堆外

靜態內存管理機制實現起來較爲簡單,但若是用戶不熟悉 Spark 的存儲機制,或沒有根據具體的數據規模和計算任務或作相應的配置,很容易形成"一半海水,一半火焰"的局面,即存儲內存和執行內存中的一方剩餘大量的空間,而另外一方卻早早被佔滿,不得不淘汰或移出舊的內容以存儲新的內容。因爲新的內存管理機制的出現,這種方式目前已經不多有開發者使用,出於兼容舊版本的應用程序的目的,Spark 仍然保留了它的實現。

7.2.2 統一內存管理

  • 動態佔用機制圖

憑藉統一內存管理機制,Spark 在必定程度上提升了堆內和堆外內存資源的利用率,下降了開發者維護 Spark 內存的難度,但並不意味着開發者能夠高枕無憂。譬如,因此若是存儲內存的空間太大或者說緩存的數據過多,反而會致使頻繁的全量垃圾回收,下降任務執行時的性能,由於緩存的 RDD 數據一般都是長期駐留內存的 。因此要想充分發揮 Spark 的性能,須要開發者進一步瞭解存儲內存和執行內存各自的管理方式和實現原理。

7.2.3 存儲內存管理

  • RDD 的持久化機制

彈性分佈式數據集(RDD)做爲 Spark 最根本的數據抽象,是隻讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上建立,或者在其餘已有的 RDD 上執行轉換(Transformation)操做產生一個新的 RDD。轉換後的 RDD 與原始的 RDD 之間產生的依賴關係,構成了血統(Lineage)。憑藉血統,Spark 保證了每個 RDD 均可以被從新恢復。但 RDD 的全部轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 纔會建立任務讀取 RDD,而後真正觸發轉換的執行。

Task 在啓動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,若是沒有則須要檢查 Checkpoint 或按照血統從新計算。因此若是一個 RDD 上要執行屢次行動,能夠在第一次行動中使用 persist 或 cache 方法,在內存或磁盤中持久化或緩存這個 RDD,從而在後面的行動時提高計算速度。事實上,cache 方法是使用默認的 MEMORY_ONLY 的存儲級別將 RDD 持久化到內存,故緩存是一種特殊的持久化。 堆內和堆外存儲內存的設計,即可以對緩存 RDD 時使用的內存作統一的規劃和管 理 (存儲內存的其餘應用場景,如緩存 broadcast 數據,暫時不在本文的討論範圍以內)。

RDD 的持久化由 Spark 的 Storage 模塊 [7] 負責,實現了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程當中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構,即 Driver 端的 BlockManager 爲 Master,Executor 端的 BlockManager 爲 Slave。Storage 模塊在邏輯上以 Block 爲基本存儲單位,RDD 的每一個 Partition 通過處理後惟一對應一個 Block(BlockId 的格式爲 rdd_RDD-ID_PARTITION-ID )。Master 負責整個 Spark 應用程序的 Block 的元數據信息的管理和維護,而 Slave 須要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。

  • Storage 模塊示意圖

在對 RDD 持久化時,Spark 規定的存儲級別以下

  • DISK_ONLY:持久化到磁盤
  • DISK_ONLY_2:持久化到磁盤而且存一個副本(2個文件)
  • MEMORY_ONLY:持久化到內存
  • MEMORY_ONLY_2:持久化到內存而且存一個副本(2個文件)
  • MEMORY_ONLY_SER:持久化到內存,而且序列化
  • MEMORY_ONLY_SER_2:持久化到內存,而且序列化,還要存一個副本(2個文件)
  • MEMORY_AND_DISK:持久化到內存和磁盤
  • MEMORY_AND_DISK_2:持久化到內存和磁盤而且存一個副本(2個文件)
  • MEMORY_AND_DISK_SER:持久化到內存和磁盤,而且序列化
  • MEMORY_AND_DISK_SER_2:持久化到內存和磁盤,而且序列化,還要存一個副本(2個文件)
  • OFF_HEAP:持久化在堆外內存中,Spark本身管理的內存

經過對數據結構的分析,能夠看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:

  • 存儲位置:磁盤/堆內內存/堆外內存。如 MEMORY_AND_DISK 是同時在磁盤和堆內內存上存儲,實現了冗餘備份。OFF_HEAP 則是隻在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其餘位置。
  • 存儲形式:Block 緩存到存儲內存後,是否爲非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。
  • 副本數量:大於 1 時須要遠程冗餘備份到其餘節點。如 DISK_ONLY_2 須要遠程備份 1 個副本。
  • RDD 緩存的過程

RDD 在緩存到存儲內存以前,Partition 中的數據通常以迭代器(Iterator)的數據結構來訪問,這是 Scala 語言中一種遍歷數據集合的方法。經過 Iterator 能夠獲取分區中每一條序列化或者非序列化的數據項(Record),這些 Record 的對象實例在邏輯上佔用了 JVM 堆內內存的 other 部分的空間,同一 Partition 的不一樣 Record 的空間並不連續。

RDD 在緩存到存儲內存以後,Partition 被轉換成 Block,Record 在堆內或堆外存儲內存中佔用一塊連續的空間。將Partition由不連續的存儲空間轉換爲連續存儲空間的過程,Spark稱之爲"展開"(Unroll)。Block 有序列化和非序列化兩種存儲格式,具體以哪一種方式取決於該 RDD 的存儲級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義,用一個數組存儲全部的對象實例,序列化的 Block 則以 SerializedMemoryEntry的數據結構定義,用字節緩衝區(ByteBuffer)來存儲二進制數據。每一個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中全部的 Block 對象的實例[6],對這個 LinkedHashMap 新增和刪除間接記錄了內存的申請和釋放。

由於不能保證存儲空間能夠一次容納 Iterator 中的全部數據,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時佔位,空間不足則 Unroll 失敗,空間足夠時能夠繼續進行。對於序列化的 Partition,其所需的 Unroll 空間能夠直接累加計算,一次申請。而非序列化的 Partition 則要在遍歷 Record 的過程當中依次申請,即每讀取一條 Record,採樣估算其所需的 Unroll 空間並進行申請,空間不足時能夠中斷,釋放已佔用的 Unroll 空間。若是最終 Unroll 成功,當前 Partition 所佔用的 Unroll 空間被轉換爲正常的緩存 RDD 的存儲空間。

  • Spark Unroll 示意圖

在靜態內存管理時,Spark 在存儲內存中專門劃分了一塊 Unroll 空間,其大小是固定的,統一內存管理時則沒有對 Unroll 空間進行特別區分,當存儲空間不足時會根據動態佔用機制進行處理。

  • 淘汰和落盤

因爲同一個 Executor 的全部的計算任務共享有限的存儲內存空間,當有新的 Block 須要緩存可是剩餘空間不足且沒法動態佔用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 若是其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),不然直接刪除該 Block。

存儲內存的淘汰規則爲:

  • 被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬於堆外或堆內內存
  • 新舊 Block 不能屬於同一個 RDD,避免循環淘汰
  • 舊 Block 所屬 RDD 不能處於被讀狀態,避免引起一致性問題
  • 遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到知足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。

落盤的流程則比較簡單,若是其存儲級別符合_useDisk 爲 true 的條件,再根據其_deserialized 判斷是不是非序列化的形式,如果則對其進行序列化,最後將數據存儲到磁盤,在 Storage 模塊中更新其信息。

7.2.4 執行內存管理

  • 多任務間內存分配

Executor 內運行的任務一樣共享執行內存,Spark 用一個 HashMap 結構保存了任務到內存耗費的映射。每一個任務可佔用的執行內存大小的範圍爲 1/2N ~ 1/N,其中 N 爲當前 Executor 內正在運行的任務的個數。每一個任務在啓動之時,要向 MemoryManager 請求申請最少爲 1/2N 的執行內存,若是不能被知足要求則該任務被阻塞,直到有其餘任務釋放了足夠的執行內存,該任務才能夠被喚醒

  • Shuffle 的內存佔用

執行內存主要用來存儲任務在執行 Shuffle 時佔用的內存,Shuffle 是按照必定規則對 RDD 數據從新分區的過程,咱們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:

Shuffle Write

一、若在 map 端選擇普通的排序方式,會採用 ExternalSorter 進行外排,在內存中存儲數據時主要佔用堆內執行空間。

二、若在 map 端選擇 Tungsten 的排序方式,則採用 ShuffleExternalSorter 直接對以序列化形式存儲的數據排序,在內存中存儲數據時能夠佔用堆外或堆內執行空間,取決於用戶是否開啓了堆外內存以及堆外執行內存是否足夠。

Shuffle Read

一、在對 reduce 端的數據進行聚合時,要將數據交給 Aggregator 處理,在內存中存儲數據時佔用堆內執行空間。

二、若是須要進行最終結果排序,則要將再次將數據交給 ExternalSorter 處理,佔用堆內執行空間。

在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程當中全部數據並不能都保存到該哈希表中,當這個哈希表佔用的內存會進行週期性地採樣估算,當其大到必定程度,沒法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其所有內容存儲到磁盤文件中,這個過程被稱爲溢存(Spill),溢存到磁盤的文件最後會被歸併(Merge)。

Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計劃,解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的狀況來自動選擇是否採用 Tungsten 排序。Tungsten 採用的頁式內存管理機制創建在 MemoryManager 之上,即 Tungsten 對執行內存的使用進行了一步的抽象,這樣在 Shuffle 過程當中無需關心數據具體存儲在堆內仍是堆外。每一個內存頁用一個 MemoryBlock 來定義,並用 Object obj 和 long offset 這兩個變量統一標識一個內存頁在系統內存中的地址。堆內的 MemoryBlock 是以 long 型數組的形式分配的內存,其 obj 的值爲是這個數組的對象引用,offset 是 long 型數組的在 JVM 中的初始偏移地址,二者配合使用能夠定位這個數組在堆內的絕對地址;堆外的 MemoryBlock 是直接申請到的內存塊,其 obj 爲 null,offset 是這個內存塊在系統內存中的 64 位絕對地址。Spark 用 MemoryBlock 巧妙地將堆內和堆外內存頁統一抽象封裝,並用頁表(pageTable)管理每一個 Task 申請到的內存頁。

Tungsten 頁式管理下的全部內存用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:

  • 頁號:佔 13 位,惟一標識一個內存頁,Spark 在申請內存頁以前要先申請空閒頁號。
  • 頁內偏移量:佔 51 位,是在使用內存頁存儲數據時,數據在頁內的偏移地址。

有了統一的尋址方式,Spark 能夠用 64 位邏輯地址的指針定位到堆內或堆外的內存,整個 Shuffle Write 排序的過程只須要對指針進行排序,而且無需反序列化,整個過程很是高效,對於內存訪問效率和 CPU 使用效率帶來了明顯的提高。

Spark 的存儲內存和執行內存有着大相徑庭的管理方式:對於存儲內存來講,Spark 用一個 LinkedHashMap 來集中管理全部的 Block,Block 由須要緩存的 RDD 的 Partition 轉化而成;而對於執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程當中的數據,在 Tungsten 排序中甚至抽象成爲頁式內存管理,開闢了全新的 JVM 內存管理機制。

8 常規性能調優

8.1 最優資源配置

Spark性能調優的第一步,就是爲任務分配更多的資源,在必定範圍內,增長資源的分配與性能的提高是成正比的,實現了最優的資源配置後,在此基礎上再考慮進行後面論述的性能調優策略。

  • 資源的分配在使用腳本提交Spark任務時進行指定,標準的Spark任務提交腳本以下:
spark-submit \
 
--class com.xxx.spark.TestSpark \
 
--num-executors 80 \
 
--driver-memory 6g \
 
--executor-memory 6g \
 
--executor-cores 3 \
 
/usr/opt/modules/spark/jar/spark.jar
名稱 說明
--num-executors 配置Executor的數量
--driver-memory 配置Driver內存(影響不大)
--executor-memory 配置每一個Executor的內存大小
--executor-cores 配置每一個Executor的CPU core數量
  • 調節原則:儘可能將任務分配的資源調節到能夠使用的資源的最大限度。

  • 對於具體資源的分配,咱們分別討論Spark的兩種Cluster運行模式:

    • 第一種是Spark Standalone模式,你在提交任務前,必定知道或者能夠從運維部門獲取到你能夠使用的資源狀況,在編寫submit腳本的時候,就根據可用的資源狀況進行資源的分配,好比說集羣有15臺機器,每臺機器爲8G內存,2個CPU core,那麼就指定15個Executor,每一個Executor分配8G內存,2個CPU core。
    • 第二種是Spark Yarn模式,因爲Yarn使用資源隊列進行資源的分配和調度,在表寫submit腳本的時候,就根據Spark做業要提交到的資源隊列,進行資源的分配,好比資源隊列有400G內存,100個CPU core,那麼指定50個Executor,每一個Executor分配8G內存,2個CPU core。
  • 各項資源進行了調節後,獲得的性能提高以下表

名稱 解析
增長Executor個數 在資源容許的狀況下,增長Executor的個數能夠提升執行task的並行度。
好比有4個Executor,每一個Executor有2個CPU core,那麼能夠並行執行8個task,
若是將Executor的個數增長到8個(資源容許的狀況下),那麼能夠並行執行16個task,此時的並行能力提高了一倍。
增長每一個Executor的CPU core個數 在資源容許的狀況下,增長每一個Executor的Cpu core個數,能夠提升執行task的並行度。
好比有4個Executor,每一個Executor有2個CPU core,那麼能夠並行執行8個task,
若是將每一個Executor的CPU core個數增長到4個(資源容許的狀況下),
那麼能夠並行執行16個task,此時的並行能力提高了一倍
增長每一個Executor的內存量 在資源容許的狀況下,增長每一個Executor的內存量之後,對性能的提高有三點:
能夠緩存更多的數據(即對RDD進行cache),寫入磁盤的數據相應減小,
甚至能夠不寫入磁盤,減小了可能的磁盤IO;
能夠爲shuffle操做提供更多內存,即有更多空間來存放reduce端拉取的數據,
寫入磁盤的數據相應減小,甚至能夠不寫入磁盤,減小了可能的磁盤IO;
能夠爲task的執行提供更多內存,在task的執行過程當中可能建立不少對象,
內存較小時會引起頻繁的GC,增長內存後,能夠避免頻繁的GC,提高總體性能。
  • 生產環境Spark submit腳本配置
spark-submit \
 
--class com.xxx.spark.WordCount \
 
--num-executors 80 \
 
--driver-memory 6g \
 
--executor-memory 6g \
 
--executor-cores 3 \
 
--master yarn-cluster \
 
--queue root.default \
 
--conf spark.yarn.executor.memoryOverhead=2048 \
 
--conf spark.core.connection.ack.wait.timeout=300 \
 
/usr/local/spark/spark.jar

參數配置參考值:

--num-executors:50~100

--driver-memory:1G~5G

--executor-memory:6G~10G

--executor-cores:3

--master:實際生產環境必定使用yarn-cluster

8.2 RDD優化

8.2.1 RDD複用

  • 在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重複的計算

  • 對上圖中的RDD計算架構進行修改,獲得如圖所示的優化結果

8.2.2 RDD持久化

在Spark中,當屢次對同一個RDD執行算子操做時,每一次都會對這個RDD以以前的父RDD從新計算一次,這種狀況是必需要避免的,對同一個RDD的重複計算是對資源的極大浪費,所以,必須對屢次使用的RDD進行持久化,經過持久化將公共RDD的數據緩存到內存/磁盤中,以後對於公共RDD的計算都會從內存/磁盤中直接獲取RDD數據。

  • 對於RDD的持久化,有兩點須要說明:
    1. RDD的持久化是能夠進行序列化的,當內存沒法將RDD的數據完整的進行存放的時候,能夠考慮使用序列化的方式減少數據體積,將數據完整存儲在內存中。
    2. 若是對於數據的可靠性要求很高,而且內存充足,能夠使用副本機制,對RDD數據進行持久化。當持久化啓用了複本機制時,對於持久化的每一個數據單元都存儲一個副本,放在其餘節點上面,由此實現數據的容錯,一旦一個副本數據丟失,不須要從新計算,還能夠使用另一個副本。

8.2.3 RDD儘量早的filter操做

  • 獲取到初始RDD後,應該考慮儘早地過濾掉不須要的數據,進而減小對內存的佔用,從而提高Spark做業的運行效率。

8.3 並行度調節

Spark做業中的並行度指各個stage的task的數量。

若是並行度設置不合理而致使並行度太低,會致使資源的極大浪費,例如,20個Executor,每一個Executor分配3個CPU core,而Spark做業有40個task,這樣每一個Executor分配到的task個數是2個,這就使得每一個Executor有一個CPU core空閒,致使資源的浪費。

理想的並行度設置,應該是讓並行度與資源相匹配,簡單來講就是在資源容許的前提下,並行度要設置的儘量大,達到能夠充分利用集羣資源。合理的設置並行度,能夠提高整個Spark做業的性能和運行速度。

Spark官方推薦,task數量應該設置爲Spark做業總CPU core數量的2~3倍。之因此沒有推薦task數量與CPU core總數相等,是由於task的執行時間不一樣,有的task執行速度快而有的task執行速度慢,若是task數量與CPU core總數相等,那麼執行快的task執行完成後,會出現CPU core空閒的狀況。若是task數量設置爲CPU core總數的2~3倍,那麼一個task執行完畢後,CPU core會馬上執行下一個task,下降了資源的浪費,同時提高了Spark做業運行的效率。

  • Spark做業並行度的設置以下:
val conf = new SparkConf().set("spark.default.parallelism", "500")

8.4 廣播大變量

默認狀況下,task中的算子中若是使用了外部的變量,每一個task都會獲取一份變量的複本,這就形成了內存的極大消耗。一方面,若是後續對RDD進行持久化,可能就沒法將RDD數據存入內存,只能寫入磁盤,磁盤IO將會嚴重消耗性能;另外一方面,task在建立對象的時候,也許會發現堆內存沒法存放新建立的對象,這就會致使頻繁的GC,GC會致使工做線程中止,進而致使Spark暫停工做一段時間,嚴重影響Spark性能。

假設當前任務配置了20個Executor,指定500個task,有一個20M的變量被全部task共用,此時會在500個task中產生500個副本,耗費集羣10G的內存,若是使用了廣播變量, 那麼每一個Executor保存一個副本,一共消耗400M內存,內存消耗減小了5倍。

廣播變量在每一個Executor保存一個副本,此Executor的全部task共用此廣播變量,這讓變量產生的副本數量大大減小。

在初始階段,廣播變量只在Driver中有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的BlockManager中嘗試獲取變量,若是本地沒有,BlockManager就會從Driver或者其餘節點的BlockManager上遠程拉取變量的複本,並由本地的BlockManager進行管理;以後此Executor的全部task都會直接從本地的BlockManager中獲取變量。

8.5 Kryo序列化

默認狀況下,Spark使用Java的序列化機制。Java的序列化機制使用方便,不須要額外的配置,在算子中使用的變量實現Serializable接口便可,可是,Java序列化機制的效率不高,序列化速度慢而且序列化後的數據所佔用的空間依然較大。

Kryo序列化機制比Java序列化機制性能提升10倍左右,Spark之因此沒有默認使用Kryo做爲序列化類庫,是由於它不支持全部對象的序列化,同時Kryo須要用戶在使用前註冊須要序列化的類型,不夠方便,但從Spark 2.0.0版本開始,簡單類型、簡單類型數組、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了。

  • 自定義類的Kryo序列化註冊方式的實例代碼以下
public class MyKryoRegistrator implements KryoRegistrator
{
  @Override
  public void registerClasses(Kryo kryo)
  {
    kryo.register(StartupReportLogs.class);
  }
}
  • Kryo序列化機制配置代碼以下
//建立SparkConf對象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫,若是要使用Java序列化庫,須要把該行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化庫中註冊自定義的類集合,若是要使用Java序列化庫,須要把該行屏蔽掉
conf.set("spark.kryo.registrator", "com.xxx.MyKryoRegistrator");

8.6 調節本地化等待時長

Spark做業運行過程當中,Driver會對每個stage的task進行分配。根據Spark的task分配算法,Spark但願task可以運行在它要計算的數據算在的節點(數據本地化思想),這樣就能夠避免數據的網絡傳輸。一般來講,task可能不會被分配到它處理的數據所在的節點,由於這些節點可用的資源可能已經用盡,此時,Spark會等待一段時間,默認3s,若是等待指定時間後仍然沒法在指定節點運行,那麼會自動降級,嘗試將task分配到比較差的本地化級別所對應的節點上,好比將task分配到離它要計算的數據比較近的一個節點,而後進行計算,若是當前級別仍然不行,那麼繼續降級。

當task要處理的數據不在task所在節點上時,會發生數據的傳輸。task會經過所在節點的BlockManager獲取數據,BlockManager發現數據不在本地時,戶經過網絡傳輸組件從數據所在節點的BlockManager處獲取數據。

網絡傳輸數據的狀況是咱們不肯意看到的,大量的網絡傳輸會嚴重影響性能,所以,咱們但願經過調節本地化等待時長,若是在等待時長這段時間內,目標節點處理完成了一部分task,那麼當前的task將有機會獲得執行,這樣就可以改善Spark做業的總體性能。

  • Spark本地化等級
名稱 解析
PROCESS_LOCAL 進程本地化,task和數據在同一個Executor中,性能最好。
NODE_LOCAL 節點本地化,task和數據在同一個節點中,可是task和數據不在同一個Executor中,數據須要在進程間進行傳輸。
RACK_LOCAL 機架本地化,task和數據在同一個機架的兩個節點上,數據須要經過網絡在節點之間進行傳輸。
NO_PREF 對於task來講,從哪裏獲取都同樣,沒有好壞之分。
ANY task和數據能夠在集羣的任何地方,並且不在一個機架中,性能最差。

在Spark項目開發階段,能夠使用client模式對程序進行測試,此時,能夠在本地看到比較全的日誌信息,日誌信息中有明確的task數據本地化的級別,若是大部分都是PROCESS_LOCAL,那麼就無需進行調節,可是若是發現不少的級別都是NODE_LOCAL、ANY,那麼須要對本地化的等待時長進行調節,經過延長本地化等待時長,看看task的本地化級別有沒有提高,並觀察Spark做業的運行時間有沒有縮短。

注意,過猶不及,不要將本地化等待時長延長地過長,致使由於大量的等待時長,使得Spark做業的運行時間反而增長了。

  • 本地化等待市場設置以下:
val conf = new SparkConf().set("spark.locality.wait", "6")

9 算子調優

9.1 mapPartitions

普通的map算子對RDD中的每個元素進行操做,而mapPartitions算子對RDD中每個分區進行操做。若是是普通的map算子,假設一個partition有1萬條數據,那麼map算子中的function要執行1萬次,也就是對每一個元素進行操做。

若是是mapPartition算子,因爲一個task處理一個RDD的partition,那麼一個task只會執行一次function,function一次接收全部的partition數據,效率比較高。

好比,當要把RDD中的全部數據經過JDBC寫入數據,若是使用map算子,那麼須要對RDD中的每個元素都建立一個數據庫鏈接,這樣對資源的消耗很大,若是使用mapPartitions算子,那麼針對一個分區的數據,只須要創建一個數據庫鏈接。

mapPartitions算子也存在一些缺點:對於普通的map操做,一次處理一條數據,若是在處理了2000條數據後內存不足,那麼能夠將已經處理完的2000條數據從內存中垃圾回收掉;可是若是使用mapPartitions算子,但數據量很是大時,function一次處理一個分區的數據,若是一旦內存不足,此時沒法回收內存,就可能會OOM,即內存溢出。

所以,mapPartitions算子適用於數據量不是特別大的時候,此時使用mapPartitions算子對性能的提高效果仍是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)

在項目中,應該首先估算一下RDD的數據量、每一個partition的數據量,以及分配給每一個Executor的內存資源,若是資源容許,能夠考慮使用mapPartitions算子代替map。

9.2 foreachPartition優化數據庫操做

在生產環境中,一般使用foreachPartition算子來完成數據庫的寫入,經過foreachPartition算子的特性,能夠優化寫數據庫的性能。

若是使用foreach算子完成數據庫的操做,因爲foreach算子是遍歷RDD的每條數據,所以,每條數據都會創建一個數據庫鏈接,這是對資源的極大浪費,所以,對於寫數據庫操做,咱們應當使用foreachPartition算子。

與mapPartitions算子很是類似,foreachPartition是將RDD的每一個分區做爲遍歷對象,一次處理一個分區的數據,也就是說,若是涉及數據庫的相關操做,一個分區的數據只須要建立一次數據庫鏈接

  • 使用了foreachPartition算子後,能夠得到如下的性能提高:
    1. 對於咱們寫的function函數,一次處理一整個分區的數據;
    2. 對於一個分區內的數據,建立惟一的數據庫鏈接;
    3. 只須要向數據庫發送一次SQL語句和多組參數;

在生產環境中,所有都會使用foreachPartition算子完成數據庫操做。foreachPartition算子存在一個問題,與mapPartitions算子相似,若是一個分區的數據量特別大,可能會形成OOM,即內存溢出。

9.3 filter與coalesce的配合使用

在Spark任務中咱們常常會使用filter算子完成RDD中數據的過濾,在任務初始階段,從各個分區中加載到的數據量是相近的,可是一旦進過filter過濾後,每一個分區的數據量有可能會存在較大差別

  • 如上圖咱們能夠發現兩個問題:

    1. 每一個partition的數據量變小了,若是還按照以前與partition相等的task個數去處理當前數據,有點浪費task的計算資源;
    2. 每一個partition的數據量不同,會致使後面的每一個task處理每一個partition數據的時候,每一個task要處理的數據量不一樣,這頗有可能致使數據傾斜問題。
  • 如圖,第二個分區的數據過濾後只剩100條,而第三個分區的數據過濾後剩下800條,在相同的處理邏輯下,第二個分區對應的task處理的數據量與第三個分區對應的task處理的數據量差距達到了8倍,這也會致使運行速度可能存在數倍的差距,這也就是數據傾斜問題。

  • 針對上述的兩個問題,咱們分別進行分析:

    1. 針對第一個問題,既然分區的數據量變小了,咱們但願能夠對分區數據進行從新分配,好比將原來4個分區的數據轉化到2個分區中,這樣只須要用後面的兩個task進行處理便可,避免了資源的浪費。
    2. 針對第二個問題,解決方法和第一個問題的解決方法很是類似,對分區數據從新分配,讓每一個partition中的數據量差很少,這就避免了數據傾斜問題。
  • 那麼具體應該如何實現上面的解決思路?咱們須要coalesce算子。

repartition與coalesce均可以用來進行重分區,其中repartition只是coalesce接口中shuffle爲true的簡易實現,coalesce默認狀況下不進行shuffle,可是能夠經過參數進行設置。

  • 假設咱們但願將本來的分區個數A經過從新分區變爲B,那麼有如下幾種狀況:
  1. A > B(多數分區合併爲少數分區)

① A與B相差值不大

此時使用coalesce便可,無需shuffle過程。

② A與B相差值很大

此時能夠使用coalesce而且不啓用shuffle過程,可是會致使合併過程性能低下,因此推薦設置coalesce的第二個參數爲true,即啓動shuffle過程。

  1. A < B(少數分區分解爲多數分區)

此時使用repartition便可,若是使用coalesce須要將shuffle設置爲true,不然coalesce無效。

咱們能夠在filter操做以後,使用coalesce算子針對每一個partition的數據量各不相同的狀況,壓縮partition的數量,並且讓每一個partition的數據量儘可能均勻緊湊,以便於後面的task進行計算操做,在某種程度上可以在必定程度上提高性能。

注意:local模式是進程內模擬集羣運行,已經對並行度和分區數量有了必定的內部優化,所以不用去設置並行度和分區數量。

9.4 repartition解決SparkSQL低並行度問題

在常規性能調優中咱們講解了並行度的調節策略,可是,並行度的設置對於Spark SQL是不生效的,用戶設置的並行度只對於Spark SQL之外的全部Spark的stage生效。

Spark SQL的並行度不容許用戶本身指定,Spark SQL本身會默認根據hive表對應的HDFS文件的split個數自動設置Spark SQL所在的那個stage的並行度,用戶本身通spark.default.parallelism參數指定的並行度,只會在沒Spark SQL的stage中生效。

因爲Spark SQL所在stage的並行度沒法手動設置,若是數據量較大,而且此stage中後續的transformation操做有着複雜的業務邏輯,而Spark SQL自動設置的task數量不多,這就意味着每一個task要處理爲數很多的數據量,而後還要執行很是複雜的處理邏輯,這就可能表現爲第一個有Spark SQL的stage速度很慢,然後續的沒有Spark SQL的stage運行速度很是快。

爲了解決Spark SQL沒法設置並行度和task數量的問題,咱們能夠使用repartition算子

Spark SQL這一步的並行度和task數量確定是沒有辦法去改變了,可是,對於Spark SQL查詢出來的RDD,當即使用repartition算子,去從新進行分區,這樣能夠從新分區爲多個partition,從repartition以後的RDD操做,因爲再也不設計Spark SQL,所以stage的並行度就會等於你手動設置的值,這樣就避免了Spark SQL所在的stage只能用少許的task去處理大量數據並執行復雜的算法邏輯。使用repartition算子的先後對好比下圖

9.5 reduceByKey本地聚合

reduceByKey相較於普通的shuffle操做一個顯著的特色就是會進行map端的本地聚合,map端會先對本地的數據進行combine操做,而後將數據寫入給下個stage的每一個task建立的文件中,也就是在map端,對每個key對應的value,執行reduceByKey算子函數。reduceByKey算子的執行過程以下圖

  • 使用reduceByKey對性能的提高以下:

    1. 本地聚合後,在map端的數據量變少,減小了磁盤IO,也減小了對磁盤空間的佔用;
    2. 本地聚合後,下一個stage拉取的數據量變少,減小了網絡傳輸的數據量;
    3. 本地聚合後,在reduce端進行數據緩存的內存佔用減小;
    4. 本地聚合後,在reduce端進行聚合的數據量減小。
  • 基於reduceByKey的本地聚合特徵,咱們應該考慮使用reduceByKey代替其餘的shuffle算子,例如groupByKey。reduceByKey與groupByKey的運行原理以下圖

groupByKey不會進行map端的聚合,而是將全部map端的數據shuffle到reduce端,而後在reduce端進行數據的聚合操做。因爲reduceByKey有map端聚合的特性,使得網絡傳輸的數據量減少,所以效率要明顯高於groupByKey。

10 Shuffle調優

10.1 調節map端緩衝區大小

在Spark任務運行過程當中,若是shuffle的map端處理的數據量比較大,可是map端緩衝的大小是固定的,可能會出現map端緩衝數據頻繁spill溢寫到磁盤文件中的狀況,使得性能很是低下,經過調節map端緩衝的大小,能夠避免頻繁的磁盤IO操做,進而提高Spark任務的總體性能。

map端緩衝的默認配置是32KB,若是每一個task處理640KB的數據,那麼會發生640/32 = 20次溢寫,若是每一個task處理64000KB的數據,機會發生64000/32=2000此溢寫,這對於性能的影響是很是嚴重的。

  • map端緩衝的配置方法
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")

10.2 調節reduce端拉取數據緩衝區大小

Spark Shuffle過程當中,shuffle reduce task的buffer緩衝區大小決定了reduce task每次可以緩衝的數據量,也就是每次可以拉取的數據量,若是內存資源較爲充足,適當增長拉取數據緩衝區的大小,能夠減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。

  • reduce端數據拉取緩衝區的大小能夠經過spark.reducer.maxSizeInFlight參數進行設置,默認爲48MB,該參數的設置方法
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")

10.3 調節reduce端拉取數據重試次數

Spark Shuffle過程當中,reduce task拉取屬於本身的數據時,若是由於網絡異常等緣由致使失敗會自動進行重試。對於那些包含了特別耗時的shuffle操做的做業,建議增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。

  • reduce端拉取數據重試次數能夠經過spark.shuffle.io.maxRetries參數進行設置,該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗,默認爲3,該參數的設置方法
val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6")

10.4 調節reduce端拉取數據等待時間

Spark Shuffle過程當中,reduce task拉取屬於本身的數據時,若是由於網絡異常等緣由致使失敗會自動進行重試,在一次失敗後,會等待必定的時間間隔再進行重試,能夠經過加大間隔時長(好比60s),以增長shuffle操做的穩定性。

  • reduce端拉取數據等待間隔能夠經過spark.shuffle.io.retryWait參數進行設置,默認值爲5s,該參數的設置方法
val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s")

10.5 調節SortShuffle排序操做閥值

對於SortShuffleManager,若是shuffle reduce task的數量小於某一閾值則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。

當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量,那麼此時map-side就不會進行排序了,減小了排序的性能開銷,可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。

SortShuffleManager排序操做閾值的設置能夠經過spark.shuffle.sort. bypassMergeThreshold這一參數進行設置,默認值爲200

  • 該參數的設置方法以下:
val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")

11 JVM調優

11.1 下降cache操做的內存佔比

11.1.1 靜態內存管理機制

根據Spark靜態內存管理機制,堆內存被劃分爲了兩塊,Storage和Execution。Storage主要用於緩存RDD數據和broadcast數據,Execution主要用於緩存在shuffle過程當中產生的中間數據,Storage佔系統內存的60%,Execution佔系統內存的20%,而且二者徹底獨立。

在通常狀況下,Storage的內存都提供給了cache操做,可是若是在某些狀況下cache操做內存不是很緊張,而task的算子中建立的對象不少,Execution內存又相對較小,這回致使頻繁的minor gc,甚至於頻繁的full gc,進而致使Spark頻繁的中止工做,性能影響會很大。

在Spark UI中能夠查看每一個stage的運行狀況,包括每一個task的運行時間、gc時間等等,若是發現gc太頻繁,時間太長,就能夠考慮調節Storage的內存佔比,讓task執行算子函數式,有更多的內存能夠使用。

Storage內存區域能夠經過spark.storage.memoryFraction參數進行指定,默認爲0.6,即60%,能夠逐級向下遞減,如代碼清單所示:

  • 內存佔比設置
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.4")
  • 統一內存管理機制

根據Spark統一內存管理機制,堆內存被劃分爲了兩塊,Storage和Execution。Storage主要用於緩存數據,Execution主要用於緩存在shuffle過程當中產生的中間數據,二者所組成的內存部分稱爲統一內存,Storage和Execution各佔統一內存的50%,因爲動態佔用機制的實現,shuffle過程須要的內存過大時,會自動佔用Storage的內存區域,所以無需手動進行調節。

11.1.2 調節Executor堆外內存

Executor的堆外內存主要用於程序的共享庫、Perm Space、 線程Stack和一些Memory mapping等, 或者類C方式allocate object。

有時,若是你的Spark做業處理的數據量很是大,達到幾億的數據量,此時運行Spark做業會時不時地報錯,例如shuffle output file cannot find,executor lost,task lost,out of memory等,這多是Executor的堆外內存不太夠用,致使Executor在運行的過程當中內存溢出。

stage的task在運行的時候,可能要從一些Executor中去拉取shuffle map output文件,可是Executor可能已經因爲內存溢出掛掉了,其關聯的BlockManager也沒有了,這就可能會報出shuffle output file cannot find,executor lost,task lost,out of memory等錯誤,此時,就能夠考慮調節一下Executor的堆外內存,也就能夠避免報錯,與此同時,堆外內存調節的比較大的時候,對於性能來說,也會帶來必定的提高。

默認狀況下,Executor堆外內存上限大概爲300多MB,在實際的生產環境下,對海量數據進行處理的時候,這裏都會出現問題,致使Spark做業反覆崩潰,沒法運行,此時就會去調節這個參數,到至少1G,甚至於2G、4G。

  • Executor堆外內存的配置須要在spark-submit腳本里配置
--conf spark.yarn.executor.memoryOverhead=2048

11.1.3 調節連接等待時長

在Spark做業運行過程當中,Executor優先從本身本地關聯的BlockManager中獲取某份數據,若是本地BlockManager沒有的話,會經過TransferService遠程鏈接其餘節點上Executor的BlockManager來獲取數據。

若是task在運行過程當中建立大量對象或者建立的對象較大,會佔用大量的內存,這回致使頻繁的垃圾回收,可是垃圾回收會致使工做現場所有中止,也就是說,垃圾回收一旦執行,Spark的Executor進程就會中止工做,沒法提供相應,此時,因爲沒有響應,沒法創建網絡鏈接,會致使網絡鏈接超時。

在生產環境下,有時會遇到file not found、file lost這類錯誤,在這種狀況下,頗有多是Executor的BlockManager在拉取數據的時候,沒法創建鏈接,而後超過默認的鏈接等待時長60s後,宣告數據拉取失敗,若是反覆嘗試都拉取不到數據,可能會致使Spark做業的崩潰。這種狀況也可能會致使DAGScheduler反覆提交幾回stage,TaskScheduler返回提交幾回task,大大延長了咱們的Spark做業的運行時間。

調節鏈接等待時長後,一般能夠避免部分的XX文件拉取失敗、XX文件lost等報錯。

  • 此時,能夠考慮調節鏈接的超時時長,鏈接等待時長鬚要在spark-submit腳本中進行設置,設置方式以下
--conf spark.core.connection.ack.wait.timeout=300

12 故障排除

12.1 控制reduce端緩衝大小以及避免OOM

在Shuffle過程,reduce端task並非等到map端task將其數據所有寫入磁盤後再去拉取,而是map端寫一點數據,reduce端task就會拉取一小部分數據,而後當即進行後面的聚合、算子函數的使用等操做。

reduce端task可以拉取多少數據,由reduce拉取數據的緩衝區buffer來決定,由於拉取過來的數據都是先放在buffer中,而後再進行後續的處理,buffer的默認大小爲48MB。

reduce端task會一邊拉取一邊計算,不必定每次都會拉滿48MB的數據,可能大多數時候拉取一部分數據就處理掉了。

雖說增大reduce端緩衝區大小能夠減小拉取次數,提高Shuffle性能,可是有時map端的數據量很是大,寫出的速度很是快,此時reduce端的全部task在拉取的時候,有可能所有達到本身緩衝的最大極限值,即48MB,此時,再加上reduce端執行的聚合函數的代碼,可能會建立大量的對象,這可難會致使內存溢出,即OOM。

若是一旦出現reduce端內存溢出的問題,咱們能夠考慮減少reduce端拉取數據緩衝區的大小,例如減小爲12MB。

在實際生產環境中是出現過這種問題的,這是典型的以性能換執行的原理。reduce端拉取數據的緩衝區減少,不容易致使OOM,可是相應的,reudce端的拉取次數增長,形成更多的網絡傳輸開銷,形成性能的降低。

​ 注意,要保證任務可以運行,再考慮性能的優化。

12.2 JVM GC致使的shuffle文件拉取失敗

在Spark做業中,有時會出現shuffle file not found的錯誤,這是很是常見的一個報錯,有時出現這種錯誤之後,選擇從新執行一遍,就再也不報出這種錯誤。

出現上述問題可能的緣由是Shuffle操做中,後面stage的task想要去上一個stage的task所在的Executor拉取數據,結果對方正在執行GC,執行GC會致使Executor內全部的工做現場所有中止,好比BlockManager、基於netty的網絡通訊等,這就會致使後面的task拉取數據拉取了半天都沒有拉取到,就會報出shuffle file not found的錯誤,而第二次再次執行就不會再出現這種錯誤。

能夠經過調整reduce端拉取數據重試次數和reduce端拉取數據時間間隔這兩個參數來對Shuffle性能進行調整,增大參數值,使得reduce端拉取數據的重試次數增長,而且每次失敗後等待的時間間隔加長。

  • JVM GC致使的shuffle文件拉取失敗
val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "60")
  .set("spark.shuffle.io.retryWait", "60s")

12.3 解決各類序列化致使的報錯

當Spark做業在運行過程當中報錯,並且報錯信息中含有Serializable等相似詞彙,那麼多是序列化問題致使的報錯。

  • 序列化問題要注意如下三點:
    1. 做爲RDD的元素類型的自定義類,必須是能夠序列化的;
    2. 算子函數裏能夠使用的外部的自定義變量,必須是能夠序列化的;
    3. 不能夠在RDD的元素類型、算子函數裏使用第三方的不支持序列化的類型,例如Connection。

12.4 解決算子函數返回NULL致使的問題

在一些算子函數裏,須要咱們有一個返回值,可是在一些狀況下咱們不但願有返回值,此時咱們若是直接返回NULL,會報錯,例如Scala.Math(NULL)異常。

  • 若是你遇到某些狀況,不但願有返回值,那麼能夠經過下述方式解決:
    1. 返回特殊值,不返回NULL,例如「-1」;
    2. 在經過算子獲取到了一個RDD以後,能夠對這個RDD執行filter操做,進行數據過濾,將數值爲-1的數據給過濾掉;
    3. 在使用完filter算子後,繼續調用coalesce算子進行優化。

12.5 解決YARN-CLIENT模式致使的網卡流量激增問題

  • YARN-client模式的運行原理圖

在YARN-client模式下,Driver啓動在本地機器上,而Driver負責全部的任務調度,須要與YARN集羣上的多個Executor進行頻繁的通訊

假設有100個Executor, 1000個task,那麼每一個Executor分配到10個task,以後,Driver要頻繁地跟Executor上運行的1000個task進行通訊,通訊數據很是多,而且通訊品類特別高。這就致使有可能在Spark任務運行過程當中,因爲頻繁大量的網絡通信,本地機器的網卡流量會激增。

注意,YARN-client模式只會在測試環境中使用,而之因此使用YARN-client模式,是因爲能夠看到詳細全面的log信息,經過查看log,能夠鎖定程序中存在的問題,避免在生產環境下發生故障。

在生產環境下,使用的必定是YARN-cluster模式。在YARN-cluster模式下,就不會形成本地機器網卡流量激增問題,若是YARN-cluster模式下存在網絡通訊的問題,須要運維團隊進行解決。

12.6 解決YARN-CLUSTER模式的JVM棧內存溢出沒法執行問題

  • YARN-cluster模式的運行原理圖

當Spark做業中包含SparkSQL的內容時,可能會碰到YARN-client模式下能夠運行,可是YARN-cluster模式下沒法提交運行(報出OOM錯誤)的狀況。

YARN-client模式下,Driver是運行在本地機器上的,Spark使用的JVM的PermGen的配置,是本地機器上的spark-class文件,JVM永久代的大小是128MB,這個是沒有問題的,可是在YARN-cluster模式下,Driver運行在YARN集羣的某個節點上,使用的是沒有通過配置的默認設置,PermGen永久代大小爲82MB。

SparkSQL的內部要進行很複雜的SQL的語義解析、語法樹轉換等等,很是複雜,若是sql語句自己就很是複雜,那麼頗有可能會致使性能的損耗和內存的佔用,特別是對PermGen的佔用會比較大。

因此,此時若是PermGen的佔用好過了82MB,可是又小於128MB,就會出現YARN-client模式下能夠運行,YARN-cluster模式下沒法運行的狀況。

  • 解決上述問題的方法時增長PermGen的容量,須要在spark-submit腳本中對相關參數進行設置,設置方法下
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

經過上述方法就設置了Driver永久代的大小,默認爲128MB,最大256MB,這樣就能夠避免上面所說的問題。

12.7 解決SparkSQL致使的JVM棧內存溢出

當SparkSQL的sql語句有成百上千的or關鍵字時,就可能會出現Driver端的JVM棧內存溢出。

JVM棧內存溢出基本上就是因爲調用的方法層級過多,產生了大量的,很是深的,超出了JVM棧深度限制的遞歸。(咱們猜想SparkSQL有大量or語句的時候,在解析SQL時,例如轉換爲語法樹或者進行執行計劃的生成的時候,對於or的處理是遞歸,or很是多時,會發生大量的遞歸)

此時,建議將一條sql語句拆分爲多條sql語句來執行,每條sql語句儘可能保證100個之內的子句。根據實際的生產環境試驗,一條sql語句的or關鍵字控制在100個之內,一般不會致使JVM棧內存溢出。

12.8 持久化與checkpoint的使用

Spark持久化在大部分狀況下是沒有問題的,可是有時數據可能會丟失,若是數據一旦丟失,就須要對丟失的數據從新進行計算,計算完後再緩存和使用,爲了不數據的丟失,能夠選擇對這個RDD進行checkpoint,也就是將數據持久化一份到容錯的文件系統上(好比HDFS)。

一個RDD緩存並checkpoint後,若是一旦發現緩存丟失,就會優先查看checkpoint數據存不存在,若是有,就會使用checkpoint數據,而不用從新計算。也便是說,checkpoint能夠視爲cache的保障機制,若是cache失敗,就使用checkpoint的數據。

使用checkpoint的優勢在於提升了Spark做業的可靠性,一旦緩存出現問題,不須要從新計算數據,缺點在於,checkpoint時須要將數據寫入HDFS等文件系統,對性能的消耗較大。

13 數據傾斜

參考:http://www.javashuo.com/article/p-diuzpkoa-my.html

有的時候,咱們可能會遇到大數據計算中一個最棘手的問題——數據傾斜,此時Spark做業的性能會比指望差不少。數據傾斜調優,就是使用各類技術方案解決不一樣類型的數據傾斜問題,以保證Spark做業的性能。

13.1 數據傾斜發生時的現象

  • 絕大多數task執行得都很是快,但個別task執行極慢。好比,總共有1000個task,997個task都在1分鐘以內執行完了,可是剩餘兩三個task卻要一兩個小時。這種狀況很常見。
  • 本來可以正常執行的Spark做業,某天忽然報出OOM(內存溢出)異常,觀察異常棧,是咱們寫的業務代碼形成的。這種狀況比較少見。

13.2 數據傾斜發生的原理

數據傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,好比按照key進行聚合或join等操做。此時若是某個key對應的數據量特別大的話,就會發生數據傾斜。好比大部分key對應10條數據,可是個別key卻對應了100萬條數據,那麼大部分task可能就只會分配到10條數據,而後1秒鐘就運行完了;可是個別task可能分配到了100萬數據,要運行一兩個小時。所以,整個Spark做業的運行進度是由運行時間最長的那個task決定的。

所以出現數據傾斜的時候,Spark做業看起來會運行得很是緩慢,甚至可能由於某個task處理的數據量過大致使內存溢出。

下圖就是一個很清晰的例子:hello這個key,在三個節點上對應了總共7條數據,這些數據都會被拉取到同一個task中進行處理;而world和you這兩個key分別纔對應1條數據,因此另外兩個task只要分別處理1條數據便可。此時第一個task的運行時間多是另外兩個task的7倍,而整個stage的運行速度也由運行最慢的那個task所決定。

13.3 如何定位致使數據傾斜的代碼

數據傾斜只會發生在shuffle過程當中。這裏給你們羅列一些經常使用的而且可能會觸發shuffle操做的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現數據傾斜時,可能就是你的代碼中使用了這些算子中的某一個所致使的。

13.4 數據傾斜的解決方案

13.4.1 解決方案一:使用Hive ETL預處理數據

方案適用場景:致使數據傾斜的是Hive表。若是該Hive表中的數據自己很不均勻(好比某個key對應了100萬數據,其餘key纔對應了10條數據),並且業務場景須要頻繁使用Spark對Hive表執行某個分析操做,那麼比較適合使用這種技術方案。

方案實現思路:此時能夠評估一下,是否能夠經過Hive來進行數據預處理(即經過Hive ETL預先對數據按照key進行聚合,或者是預先和其餘表進行join),而後在Spark做業中針對的數據源就不是原來的Hive表了,而是預處理後的Hive表。此時因爲數據已經預先進行過聚合或join操做了,那麼在Spark做業中也就不須要使用原先的shuffle類算子執行這類操做了。

方案實現原理:這種方案從根源上解決了數據傾斜,由於完全避免了在Spark中執行shuffle類算子,那麼確定就不會有數據傾斜的問題了。可是這裏也要提醒一下你們,這種方式屬於治標不治本。由於畢竟數據自己就存在分佈不均勻的問題,因此Hive ETL中進行group by或者join等shuffle操做時,仍是會出現數據傾斜,致使Hive ETL的速度很慢。咱們只是把數據傾斜的發生提早到了Hive ETL中,避免Spark程序發生數據傾斜而已。

方案優勢:實現起來簡單便捷,效果還很是好,徹底規避掉了數據傾斜,Spark做業的性能會大幅度提高。

方案缺點:治標不治本,Hive ETL中仍是會發生數據傾斜。

方案實踐經驗:在一些Java系統與Spark結合使用的項目中,會出現Java代碼頻繁調用Spark做業的場景,並且對Spark做業的執行性能要求很高,就比較適合使用這種方案。將數據傾斜提早到上游的Hive ETL,天天僅執行一次,只有那一次是比較慢的,而以後每次Java調用Spark做業時,執行速度都會很快,可以提供更好的用戶體驗。

項目實踐經驗:在美團·點評的交互式用戶行爲分析系統中使用了這種方案,該系統主要是容許用戶經過Java Web系統提交數據分析統計任務,後端經過Java提交Spark做業進行數據分析統計。要求Spark做業速度必需要快,儘可能在10分鐘之內,不然速度太慢,用戶體驗會不好。因此咱們將有些Spark做業的shuffle操做提早到了Hive ETL中,從而讓Spark直接使用預處理的Hive中間表,儘量地減小Spark的shuffle操做,大幅度提高了性能,將部分做業的性能提高了6倍以上。

13.4.2 解決方案二:過濾少數致使傾斜的key

方案適用場景:若是發現致使傾斜的key就少數幾個,並且對計算自己的影響並不大的話,那麼很適合使用這種方案。好比99%的key就對應10條數據,可是隻有一個key對應了100萬數據,從而致使了數據傾斜。

方案實現思路:若是咱們判斷那少數幾個數據量特別多的key,對做業的執行和計算結果不是特別重要的話,那麼幹脆就直接過濾掉那少數幾個key。好比,在Spark SQL中能夠使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。若是須要每次做業執行時,動態斷定哪些key的數據量最多而後再進行過濾,那麼能夠使用sample算子對RDD進行採樣,而後計算出每一個key的數量,取數據量最多的key過濾掉便可。

方案實現原理:將致使數據傾斜的key給過濾掉以後,這些key就不會參與計算了,天然不可能產生數據傾斜。

方案優勢:實現簡單,並且效果也很好,能夠徹底規避掉數據傾斜。

方案缺點:適用場景很少,大多數狀況下,致使傾斜的key仍是不少的,並非只有少數幾個。

方案實踐經驗:在項目中咱們也採用過這種方案解決數據傾斜。有一次發現某一天Spark做業在運行的時候忽然OOM了,追查以後發現,是Hive表中的某一個key在那天數據異常,致使數據量暴增。所以就採起每次執行前先進行採樣,計算出樣本中數據量最大的幾個key以後,直接在程序中將那些key給過濾掉。

13.4.3 解決方案三:提升shuffle操做的並行度

方案適用場景:若是咱們必需要對數據傾斜迎難而上,那麼建議優先使用這種方案,由於這是處理數據傾斜最簡單的一種方案。

方案實現思路:在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,好比reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量。對於Spark SQL中的shuffle類語句,好比group by、join等,須要設置一個參數,即spark.sql.shuffle.partitions,該參數表明了shuffle read task的並行度,該值默認是200,對於不少場景來講都有點太小。

方案實現原理:增長shuffle read task的數量,可讓本來分配給一個task的多個key分配給多個task,從而讓每一個task處理比原來更少的數據。舉例來講,若是本來有5個key,每一個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增長了shuffle read task之後,每一個task就分配到一個key,即每一個task就處理10條數據,那麼天然每一個task的執行時間都會變短了。具體原理以下圖所示。

方案優勢:實現起來比較簡單,能夠有效緩解和減輕數據傾斜的影響。

方案缺點:只是緩解了數據傾斜而已,沒有完全根除問題,根據實踐經驗來看,其效果有限。

方案實踐經驗:該方案一般沒法完全解決數據傾斜,由於若是出現一些極端狀況,好比某個key對應的數據量有100萬,那麼不管你的task數量增長到多少,這個對應着100萬數據的key確定仍是會分配到一個task中去處理,所以註定仍是會發生數據傾斜的。因此這種方案只能說是在發現數據傾斜時嘗試使用的第一種手段,嘗試去用嘴簡單的方法緩解數據傾斜而已,或者是和其餘方案結合起來使用。

13.4.4 解決方案四:兩階段聚合(局部聚合+全局聚合)

方案適用場景:對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。

方案實現思路:這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每一個key都打上一個隨機數,好比10之內的隨機數,此時原先同樣的key就變成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數後的數據,執行reduceByKey等聚合操做,進行局部聚合,那麼局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。而後將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操做,就能夠獲得最終結果了,好比(hello, 4)。

方案實現原理:將本來相同的key經過附加隨機前綴的方式,變成多個不一樣的key,就可讓本來被一個task處理的數據分散到多個task上去作局部聚合,進而解決單個task處理數據量過多的問題。接着去除掉隨機前綴,再次進行全局聚合,就能夠獲得最終的結果。具體原理見下圖。

方案優勢:對於聚合類的shuffle操做致使的數據傾斜,效果是很是不錯的。一般均可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將Spark做業的性能提高數倍以上。

方案缺點:僅僅適用於聚合類的shuffle操做,適用範圍相對較窄。若是是join類的shuffle操做,還得用其餘的解決方案。

// 第一步,給RDD中的每一個key都打上一個隨機前綴。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 第二步,對打上隨機前綴的key進行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

// 第三步,去除RDD中每一個key的隨機前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });

// 第四步,對去除了隨機前綴的RDD進行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

13.4.5 解決方案五:將reduce join轉爲map join

方案適用場景:在對RDD使用join類操做,或者是在Spark SQL中使用join語句時,並且join操做中的一個RDD或表的數據量比較小(好比幾百M或者一兩G),比較適用此方案。

方案實現思路:不使用join算子進行鏈接操做,而使用Broadcast變量與map類算子實現join操做,進而徹底規避掉shuffle類的操做,完全避免數據傾斜的發生和出現。將較小RDD中的數據直接經過collect算子拉取到Driver端的內存中來,而後對其建立一個Broadcast變量;接着對另一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照鏈接key進行比對,若是鏈接key相同的話,那麼就將兩個RDD的數據用你須要的方式鏈接起來。

方案實現原理:普通的join是會走shuffle過程的,而一旦shuffle,就至關於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。可是若是一個RDD是比較小的,則能夠採用廣播小RDD全量數據+map算子來實現與join一樣的效果,也就是map join,此時就不會發生shuffle操做,也就不會發生數據傾斜。具體原理以下圖所示。

方案優勢:對join操做致使的數據傾斜,效果很是好,由於根本就不會發生shuffle,也就根本不會發生數據傾斜。

方案缺點:適用場景較少,由於這個方案只適用於一個大表和一個小表的狀況。畢竟咱們須要將小表進行廣播,此時會比較消耗內存資源,driver和每一個Executor內存中都會駐留一份小RDD的全量數據。若是咱們廣播出去的RDD數據比較大,好比10G以上,那麼就可能發生內存溢出了。所以並不適合兩個都是大表的狀況。

// 首先將數據量比較小的RDD的數據,collect到Driver中來。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 而後使用Spark的廣播功能,將小RDD的數據轉換成廣播變量,這樣每一個Executor就只有一份RDD的數據。
// 能夠儘量節省內存空間,而且減小網絡傳輸性能開銷。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// 對另一個RDD執行map類操做,而再也不是join類操做。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函數中,經過廣播變量,獲取到本地Executor中的rdd1數據。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 能夠將rdd1的數據轉換爲一個Map,便於後面進行join操做。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 獲取當前RDD數據的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 從rdd1數據Map中,根據key獲取到能夠join到的數據。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });

// 這裏得提示一下。
// 上面的作法,僅僅適用於rdd1中的key沒有重複,所有是惟一的場景。
// 若是rdd1中有多個相同的key,那麼就得用flatMap類的操做,在進行join的時候不能用map,而是得遍歷rdd1全部數據進行join。
// rdd2中每條數據均可能會返回多條join後的數據。

13.4.6 解決方案六:採樣傾斜key並分拆join操做

方案適用場景:兩個RDD/Hive表進行join的時候,若是數據量都比較大,沒法採用「解決方案五」,那麼此時能夠看一下兩個RDD/Hive表中的key分佈狀況。若是出現數據傾斜,是由於其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另外一個RDD/Hive表中的全部key都分佈比較均勻,那麼採用這個解決方案是比較合適的。

方案實現思路

對包含少數幾個數據量過大的key的那個RDD,經過sample算子採樣出一份樣原本,而後統計一下每一個key的數量,計算出來數據量最大的是哪幾個key。
而後將這幾個key對應的數據從原來的RDD中拆分出來,造成一個單獨的RDD,並給每一個key都打上n之內的隨機數做爲前綴,而不會致使傾斜的大部分key造成另一個RDD。
接着將須要join的另外一個RDD,也過濾出來那幾個傾斜key對應的數據並造成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會致使傾斜的大部分key也造成另一個RDD。
再將附加了隨機前綴的獨立RDD與另外一個膨脹n倍的獨立RDD進行join,此時就能夠將原先相同的key打散成n份,分散到多個task中去進行join了。
而另外兩個普通的RDD就照常join便可。
最後將兩次join的結果使用union算子合併起來便可,就是最終的join結果。

方案實現原理:對於join致使的數據傾斜,若是隻是某幾個key致使了傾斜,能夠將少數幾個key分拆成獨立RDD,並附加隨機前綴打散成n份去進行join,此時這幾個key對應的數據就不會集中在少數幾個task上,而是分散到多個task進行join了。具體原理見下圖。

方案優勢:對於join致使的數據傾斜,若是隻是某幾個key致使了傾斜,採用該方式能夠用最有效的方式打散key進行join。並且只須要針對少數傾斜key對應的數據進行擴容n倍,不須要對全量數據進行擴容。避免了佔用過多內存。

方案缺點:若是致使傾斜的key特別多的話,好比成千上萬個key都致使數據傾斜,那麼這種方式也不適合。

// 首先從包含了少數幾個致使數據傾斜key的rdd1中,採樣10%的樣本數據。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 對樣本數據RDD統計出每一個key的出現次數,並按出現次數降序排序。
// 對降序排序後的數據,取出top 1或者top 100的數據,也就是key最多的前n個數據。
// 具體取出多少個數據量最多的key,由你們本身決定,咱們這裏就取1個做爲示範。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// 從rdd1中分拆出致使數據傾斜的key,造成獨立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 從rdd1中分拆出不致使數據傾斜的普通key,造成獨立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });

// rdd2,就是那個全部key的分佈相對較爲均勻的rdd。
// 這裏將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,並對rdd中的數據使用flatMap算子都擴容100倍。
// 對擴容的每條數據,都打上0~100的前綴。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }

        });

// 將rdd1中分拆出來的致使傾斜的key的獨立rdd,每條數據都打上100之內的隨機前綴。
// 而後將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });

// 將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 將傾斜key join後的結果與普通key join後的結果,uinon起來。
// 就是最終的join結果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

13.4.7 解決方案七:使用隨機前綴和擴容RDD進行join

方案適用場景:若是在進行join操做時,RDD中有大量的key致使數據傾斜,那麼進行分拆key也沒什麼意義,此時就只能使用最後一種方案來解決問題了。

方案實現思路
該方案的實現思路基本和「解決方案六」相似,首先查看RDD/Hive表中的數據分佈狀況,找到那個形成數據傾斜的RDD/Hive表,好比有多個key都對應了超過1萬條數據。
而後將該RDD的每條數據都打上一個n之內的隨機前綴。
同時對另一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴。
最後將兩個處理後的RDD進行join便可。

方案實現原理:將原先同樣的key經過附加隨機前綴變成不同的key,而後就能夠將這些處理後的「不一樣key」分散到多個task中去處理,而不是讓一個task處理大量的相同key。該方案與「解決方案六」的不一樣之處就在於,上一種方案是儘可能只對少數傾斜key對應的數據進行特殊處理,因爲處理過程須要擴容RDD,所以上一種方案擴容RDD後對內存的佔用並不大;而這一種方案是針對有大量傾斜key的狀況,無法將部分key拆分出來進行單獨處理,所以只能對整個RDD進行數據擴容,對內存資源要求很高。

方案優勢:對join類型的數據傾斜基本均可以處理,並且效果也相對比較顯著,性能提高效果很是不錯。

方案缺點:該方案更多的是緩解數據傾斜,而不是完全避免數據傾斜。並且須要對整個RDD進行擴容,對內存資源要求很高。

方案實踐經驗:曾經開發一個數據需求的時候,發現一個join致使了數據傾斜。優化以前,做業的執行時間大約是60分鐘左右;使用該方案優化以後,執行時間縮短到10分鐘左右,性能提高了6倍。

// 首先將其中一個key分佈相對較爲均勻的RDD膨脹100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });

// 其次,將另外一個有數據傾斜key的RDD,每條數據都打上100之內的隨機前綴。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 將兩個處理後的RDD進行join便可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

13.4.8 解決方案八:多種方案組合使用

在實踐中發現,不少狀況下,若是隻是處理較爲簡單的數據傾斜場景,那麼使用上述方案中的某一種基本就能夠解決。可是若是要處理一個較爲複雜的數據傾斜場景,那麼可能須要將多種方案組合起來使用。好比說,咱們針對出現了多個數據傾斜環節的Spark做業,能夠先運用解決方案一和二,預處理一部分數據,並過濾一部分數據來緩解;其次能夠對某些shuffle操做提高並行度,優化其性能;最後還能夠針對不一樣的聚合或join操做,選擇一種方案來優化其性能。你們須要對這些方案的思路和原理都透徹理解以後,在實踐中根據各類不一樣的狀況,靈活運用多種方案,來解決本身的數據傾斜問題。

相關文章
相關標籤/搜索