Linux環境Spark安裝配置及使用

Linux環境Spark安裝配置及使用

1. 認識Spark

(1) Spark介紹

  • 大數據計算引擎
  • 官網:spark.apache.org/
  • 官方介紹:Apache Spark™ is a unified analytics engine for large-scale data processing.(Apache Spark™是一個用於大規模數據處理的統一分析引擎。)
  • Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生於加州大學伯克利分校AMPLab,2010年開源,2013年6月成爲Apache孵化項目,2014年2月成爲Apache頂級項目。目前,Spark生態系統已經發展成爲一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基於內存計算的大數據並行計算框架。Spark基於內存計算,提升了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,容許用戶將Spark部署在大量廉價硬件之上,造成集羣。
  • Spark生態圈:
    • Spark Core:RDD(彈性分佈式數據集)
    • Spark SQL
    • Spark Streaming
    • Spark MLLib:協同過濾,ALS,邏輯迴歸等等 --> 機器學習
    • Spark Graphx:圖計算

(2) 爲何要學習Spark

  • Hadoop的MapReduce計算模型存在的問題:
    • MapReduce的核心是Shuffle(洗牌)。在整個Shuffle的過程當中,至少會產生6次的I/O。 html

    • 中間結果輸出:基於MapReduce的計算引擎一般會將中間結果輸出到磁盤上,進行存儲和容錯。另外,當一些查詢(如:Hive)翻譯到MapReduce任務時,每每會產生多個Stage(階段),而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每個Stage的輸出結果,而I/O的效率每每較低,從而影響了MapReduce的運行速度。java

  • Spark的最大特色:基於內存
  • Spark是MapReduce的替代方案,並且兼容HDFS、Hive,可融入Hadoop的生態系統,彌補MapReduce的不足。

(3) Spark的特色:快、易用、通用、兼容

  • ——與Hadoop的MapReduce相比,Spark基於內存的運算速度要快100倍以上,即便,Spark基於硬盤的運算也要快10倍。Spark實現了高效的DAG執行引擎,從而能夠經過內存來高效處理數據流。
  • 易用——Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶能夠快速構建不一樣的應用。並且Spark支持交互式的Python和Scala的shell,能夠很是方便地在這些shell中使用Spark集羣來驗證解決問題的方法。
  • 通用——Spark提供了統一的解決方案。Spark能夠用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不一樣類型的處理均可以在同一個應用中無縫使用。Spark統一的解決方案很是具備吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減小開發和維護的人力成本和部署平臺的物力成本。另外Spark還能夠很好的融入Hadoop的體系結構中能夠直接操做HDFS,並提供Hive on Spark、Pig on Spark的框架集成Hadoop。
  • 兼容——Spark能夠很是方便地與其餘的開源產品進行融合。好比,Spark可使用Hadoop的YARN和ApacheMesos做爲它的資源管理和調度器,而且能夠處理全部Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對於已經部署Hadoop集羣的用戶特別重要,由於不須要作任何數據遷移就可使用Spark的強大處理能力。Spark也能夠不依賴於第三方的資源管理和調度器,它實現了Standalone做爲其內置的資源管理和調度框架,這樣進一步下降了Spark的使用門檻,使得全部人均可以很是容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集羣的工具。

2. Spark體系架構

  • Spark的運行方式
    • Yarn
    • Standalone:本機調試(demo)
  • Worker(從節點):每一個服務器上,資源和任務的管理者,只負責管理一個節點。
  • 執行過程:
    • 一個Worker 有多個 Executor。 Executor是任務的執行者,按階段(stage)劃分任務。—> RDD
  • 客戶端:Driver Program 提交任務到集羣中。
    • spark-submit
    • spark-shell

3. Spark-2.1.0安裝流程

(1) 準備工做

  • 具有java環境
  • 配置主機名
  • 配置免密碼登陸
  • 防火牆關閉

(2) 解壓spark-2.1.0-bin-hadoop2.7.tgz安裝包到目標目錄下:

  • tar -zxvf .tar.gz -C 目標目錄

(3) 爲後續方便,重命名Spark文件夾:

  • mv spark-2.1.0-bin-hadoop2.7/ spark-2.1.0

(4) Spark目錄介紹

  • bin —— Spark操做命令
  • conf —— 配置文件
  • data —— Spark測試文件
  • examples —— Spark示例程序
  • jars
  • LICENSE
  • licenses
  • NOTICE
  • python
  • R
  • README.md
  • RELEASE
  • sbin —— Spark集羣命令
  • yarn —— Spark-yarn配置

(5) 修改配置文件:

  • <1>. 配置spark-env.sh:
    • 進入spark-2.1.0/conf路徑,重命名配置文件:
      • mv spark-env.sh.template spark-env.sh
    • 修改spark-env.sh信息:
      • vi spark-env.sh
      • export JAVA_HOME=/opt/module/jdk1.8.0_144
        export SPARK_MASTER_HOST=bigdata01
        export SPARK_MASTER_PORT=7077
        複製代碼
  • <2>. 配置slaves:
    • 進入spark-2.1.0/conf路徑,重命名配置文件:
      • mv slaves.template slaves
    • 修改slaves信息:
      • vi slaves
      • bigdata02
        bigdata03
        複製代碼

(6) 配置環境變量:

  • 修改配置文件:
    • vi /etc/profile
  • 增長如下內容:
    • export SPARK_HOME=spark安裝路徑
    • export PATH=$PATH:$SPARK_HOME/bin
    • export PATH=$PATH:$SPARK_HOME/sbin
  • 聲明環境變量:
    • source /etc/profile

(6) 集羣配置:

  • 拷貝配置好的spark到其餘機器上
    • scp -r spark-2.1.0/ bigdata02:$PWD
    • scp -r spark-2.1.0/ bigdata03:$PWD

(7) 啓動:

  • 啓動主節點:
    • start-master.sh
  • 啓動從節點:
    • start-slaves.sh
  • 啓動shell:
    • spark-shell
  • 經過網頁端查看:

(8) 關閉:

  • 關閉主節點:
    • stop-master.sh
  • 關閉從節點:
    • stop-slaves.sh

4. Spark HA的實現

(1) 基於文件系統的單點恢復

  • 主要用於開發或測試環境。node

  • 當spark提供目錄保存spark Application和worker的註冊信息,並將他們的恢復狀態寫入該目錄中,一旦Master發生故障,就能夠經過從新啓動Master進程(sbin/start-master.sh),恢復已運行的spark Application和worker的註冊信息。python

  • 基於文件系統的單點恢復,主要是在spark-env.sh裏對SPARK_DAEMON_JAVA_OPTS設置 mysql

    • 建立存放文件夾:mkdir /opt/module/spark-2.1.0/recovery
    • 修改配置信息:
      • vi spark-env.sh
      • 增長內容:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/opt/module/spark-2.1.0/recovery"

(2) 基於Zookeeper的Standby Masters

  • 適用於現實生產。es6

  • ZooKeeper提供了一個Leader Election機制,利用這個機制能夠保證雖然集羣存在多個Master,可是隻有一個是Active的,其餘的都是Standby。當Active的Master出現故障時,另外的一個Standby Master會被選舉出來。因爲集羣的信息,包括Worker,Driver和Application的信息都已經持久化到ZooKeeper,所以在切換的過程當中只會影響新Job的提交,對於正在進行的Job沒有任何的影響。加入ZooKeeper的集羣總體架構以下圖所示: web

  • 修改配置信息:算法

    • vi spark-env.sh
    • 增長內容:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"
    • 註釋掉:export SPARK_MASTER_HOSTexport SPARK_MASTER_PORT
  • 發送新的配置文件到集羣其他節點:sql

    • scp spark-env.sh bigdata02:$PWD
    • scp spark-env.sh bigdata03:$PWD

5. 執行Spark的任務

(1) spark-submit

  • 用於提交Spark的任務(任務即相關jar包)
  • e.g.: 蒙特卡洛求PI(圓周率)
    • 原理:以下圖所示,隨機向正方形內落點,經過統計正方形內全部點數和落入圓內的點數來計算佔比,得出正方形與圓的面積近似比值,進而近似出PI值。
    • 命令:
      • spark-submit --master spark://XXXX:7077 (指明master地址) --class org.apache.spark.examples.SparkPi (指明主程序的名字) /XXXX/spark/examples/jars/spark-examples_2.11-2.1.0.jar(指明jar包地址) 100(指明運行次數)

(2) spark-shell

  • 至關於REPL,做爲一個獨立的Application運行
  • spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶能夠在該命令行下用scala編寫spark程序。
  • 參數說明:
    • --master spark://XXXX:7077 指定Master的地址
    • --executor-memory 2g 指定每一個worker可用內存爲2G
    • --total-executor-cores 2 指定整個集羣使用的cup核數爲2個
  • Spark Session 是 2.0 之後提供的,利用 SparkSession 能夠訪問spark全部組件
  • 兩種運行模式:
    • <1>. 本地模式
      • 啓動:spark-shell(後面不接任何參數)
    • <2>. 集羣模式
      • 啓動:spark-shell --master spark://XXXX:7077(指明master地址)
  • e.g.: 編寫WordCount程序
    • <1>. 處理本地文件,把結果打印到屏幕上
      • 啓動:spark-shell
      • 傳入文件:sc.textFile("/XXXX/WordCount.txt")(本地文件路徑).flatMap(_.split(" "))(按照空格分割).map((_,1))(單詞遍歷).reduceByKey(_+_)(單詞計數).collect
    • <2>. 處理HDFS文件,結果保存在hdfs上
      • 啓動:spark-shell --master spark://XXXX:7077(指
      • sc.textFile("hdfs://XXXX:9000/sp_wc.txt").flatMap(.split(" ")).map((,1)).reduceByKey(+).saveAsTextFile("hdfs://XXXX:9000/output/spark/WordCount")

(3) 單步運行WordCount -> RDD

  • 啓動shell:spark-shell
  • scala> val rdd1 = sc.textFile("/root/sp_wc.txt")
      rdd1: org.apache.spark.rdd.RDD[String] = /root/sp_wc.txt MapPartitionsRDD[1] at textFile at <console>:24
      
      scala> rdd1.collect
      res0: Array[String] = Array(I love Scala, I love Skark, 2019/5/8)
      
      scala> val rdd2 = rdd1.flatMap(_.split(" "))
      rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26
      
      scala> rdd2.collect
      res1: Array[String] = Array(I, love, Scala, I, love, Skark, 2019/5/8)
      
      scala> val rdd3 = rdd2.map((_,1))
      rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28
      
      scala> rdd3.collect
      res2: Array[(String, Int)] = Array((I,1), (love,1), (Scala,1), (I,1), (love,1), (Skark,1), (2019/5/8,1))
      
      scala> val rdd4 = rdd3.reduceByKey(_+_)
      rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30
      
      scala> rdd4.collect
      res3: Array[(String, Int)] = Array((2019/5/8,1), (love,2), (I,2), (Skark,1), (Scala,1))
    複製代碼

(4) 在IDE中運行WorkCount

  • <1>. scala版本
    • import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      
      object WordCount {
        
        def main(args: Array[String]): Unit = {
          
          //建立一個Spark配置文件
          val conf = new SparkConf().setAppName("Scala WordCount").setMaster("local")
          
          //建立Spark對象
          val sc = new SparkContext(conf)
          
          val result = sc.textFile(args(0))
            .flatMap(_.split(" "))
            .map((_, 1))
            .reduceByKey(_ + _)
            .saveAsTextFile(args(1))
      
          sc.stop()
        }
      }
      複製代碼
  • <2>. Java版本
    • import java.util.Arrays;
      import java.util.Iterator;
      import java.util.List;
      
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.FlatMapFunction;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.api.java.function.PairFunction;
      
      import parquet.format.PageHeader;
      import scala.Tuple2;
      
      public class WordCount {
      
      	public static void main(String[] args) {
      		// TODO Auto-generated method stub
      
      		SparkConf conf = new SparkConf()
      				.setAppName("JavaWordCount")
      				.setMaster("local") ;
      
      		//新建SparkContext對象
      		JavaSparkContext sc = new JavaSparkContext(conf) ;
      		
      		//讀入數據
      		JavaRDD<String> lines = sc.textFile("hdfs://XXXX:9000/WordCount.txt") ;
      		
      		//分詞 第一個參數表示讀進來的話 第二個參數表示 返回值
      		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      		
      		@Override
      		public Iterator<String> call(String input) throws Exception {
      			
      			return Arrays.asList(input.split(" ")).iterator() ;
      		    }
      		}) ;
      		
      		//每一個單詞記一次數 
      		/*
      		* String, String, Integer
      		* input   <key      value>
      		*/
      		JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
      		
      		@Override
      		public Tuple2<String, Integer> call(String input) throws Exception {
      			
      			return new Tuple2<String, Integer>(input, 1) ;
      		}
      		}) ;
      		
      		//執行reduce操做
      		/*
      		* Integer, Integer, Integer
      		* nteger arg0, Integer arg1 返回值
      		*/
      		JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
      		
      			@Override
      			public Integer call(Integer arg0, Integer arg1) throws Exception {
      				// TODO Auto-generated method stub
      				return arg0 + arg1 ;
      			}
      		}) ;
      		
      		//打印結果
      		List<Tuple2<String, Integer>> output = counts.collect() ;
      		
      		for (Tuple2<String, Integer> tuple :output) {
      			System.out.println(tuple._1 + " : " + tuple._2) ;
      		}
      		
      		sc.stop() ;
      		
      		}
      }
      複製代碼

(5) WordCount程序處理過程

(6) Spark提交任務的流程

6. Spark的算子

(1) RDD基礎

  • <1>. 什麼是RDD
    • RDD(Resilient Distributed Dataset)叫作彈性分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。
  • <2>. RDD的屬性(源碼中的一段話)
    • **一組分片(Partition)。**即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。
    • **一個計算每一個分區的函數。**Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。
    • **RDD之間的依賴關係。**RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
    • **一個Partitioner,即RDD的分片函數。**當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
    • **一個列表。**存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。
  • <3>. RDD的建立方式
    • 經過外部的數據文件建立,如HDFS:
      • val rdd1 = sc.textFile(「hdfs://XXXX:9000/data.txt」)
    • 經過sc.parallelize進行建立:
      • val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    • DD的類型:Transformation和Action
  • <4>. RDD的基本原理

(2) Transformation

  • RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。

(3) Action

(4) RDD的緩存機制

  • RDD經過persist方法或cache方法能夠將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
  • 經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
  • 緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。
    • Demo示例:
    • 經過UI進行監控:

(5) RDD的Checkpoint(檢查點)機制:容錯機制

  • 檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage(血統)作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。
  • 設置checkpoint的目錄,能夠是本地的文件夾、也能夠是HDFS。通常是在具備容錯能力,高可靠的文件系統上(好比HDFS, S3等)設置一個檢查點路徑,用於保存檢查點數據。
  • 分別舉例說明:
    • <1>. 本地目錄
    • 注意:這種模式,須要將spark-shell運行在本地模式上
    • <2>. HDFS的目錄
    • 注意:這種模式,須要將spark-shell運行在集羣模式上

(6) RDD的依賴關係和Spark任務中的Stage

  • RDD的依賴關係shell

    • RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

    • 窄依賴指的是每個父RDD的Partition最多被子RDD的一個Partition使用

      • 總結:窄依賴咱們形象的比喻爲獨生子女
    • 寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition

      • 總結:窄依賴咱們形象的比喻爲超生
  • Spark任務中的Stage

    • DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據

(7) RDD基礎練習

  • 練習1:

  • //經過並行化生成rdd
      val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
      //對rdd1裏的每個元素乘2而後排序
      val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
      //過濾出大於等於十的元素
      val rdd3 = rdd2.filter(_ >= 10)
      //將元素以數組的方式在客戶端顯示
      rdd3.collect
    複製代碼
  • 練習2:

  • val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
      //將rdd1裏面的每個元素先切分在壓平
      val rdd2 = rdd1.flatMap(_.split(' '))
      rdd2.collect
    複製代碼
  • 練習3:

  • val rdd1 = sc.parallelize(List(5, 6, 4, 3))
      val rdd2 = sc.parallelize(List(1, 2, 3, 4))
      //求並集
      val rdd3 = rdd1.union(rdd2)
      //求交集
      val rdd4 = rdd1.intersection(rdd2)
      //去重
      rdd3.distinct.collect
      rdd4.collect
    複製代碼
  • 練習4:

  • val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
      val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
      //求jion
      val rdd3 = rdd1.join(rdd2)
      rdd3.collect
      //求並集
      val rdd4 = rdd1 union rdd2
      //按key進行分組
      rdd4.groupByKey
      rdd4.collect
    複製代碼
  • 練習5:

  • val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
      val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
      //cogroup
      val rdd3 = rdd1.cogroup(rdd2)
      //注意cogroup與groupByKey的區別
      rdd3.collect
    複製代碼
  • 練習6:

  • val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
      //reduce聚合
      val rdd2 = rdd1.reduce(_ + _)
      rdd2.collect
    複製代碼
  • 練習7:

  • val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
      val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
      val rdd3 = rdd1.union(rdd2)
      //按key進行聚合
      val rdd4 = rdd3.reduceByKey(_ + _)
      rdd4.collect
      //按value的降序排序
      val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
      rdd5.collect 
    複製代碼

    7. Spark RDD的高級算子

(1) mapPartitionsWithIndex

  • 把每一個partition中的分區號和對應的值拿出來
    • def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
  • f中函數參數:
    • 第一個參數是Int,表明分區號
    • 第二個Iterator[T]表明分區中的元素
  • e.g.: 將每一個分區中的元素和分區號打印出來
    • val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    • 建立一個函數返回RDD中的每一個分區號和元素:
      • def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
            iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
        }
        複製代碼
    • 調用:rdd1.mapPartitionsWithIndex(func1).collect

(2) aggregate

  • 先對局部聚合,再對全局聚合
  • e.g.: val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
    • 查看每一個分區中的元素:

      • scala> rdd1.mapPartitionsWithIndex(fun1).collect
          	res4: Array[String] = Array(
          	[partId : 0 , value = 1 ], [partId : 0 , value = 2 ], 
          	[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
        複製代碼
    • 將每一個分區中的最大值求和,注意初始值是0:

      • scala> rdd2.aggregate(0)(max(_,_),_+_)
          	res6: Int = 7
        複製代碼
      • 若是初始值時候100,則結果爲300:
      • scala> rdd2.aggregate(100)(max(_,_),_+_)
          	res8: Int = 300
            ```
        複製代碼
    • 若是是求和,注意初始值是0:

      • scala> rdd2.aggregate(0)(_+_,_+_)
          	res9: Int = 15
        複製代碼
      • 若是初始值是10,則結果是45
      • scala> rdd2.aggregate(10)(_+_,_+_)
          	res10: Int = 45  
        複製代碼
    • e.g. —— 字符串:

      • val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
      • 修改一下剛纔的查看分區元素的函數
        • def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
              iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
          }
          複製代碼
        • 兩個分區中的元素:
          • [partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
            [partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
            複製代碼
        • 運行結果:
    • e.g.:

      • val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
        rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
        複製代碼
      • 結果多是24,也多是42

      • val rdd4 = sc.parallelize(List("12","23","345",""),2)
        rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
        複製代碼
      • 結果是10,也多是01

      • 緣由:注意有個初始值"",其長度0,而後0.toString變成字符串

      • val rdd5 = sc.parallelize(List("12","23","","345"),2)
        rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
        複製代碼
      • 結果是11,緣由同上。

(3) aggregateByKey

  • 準備數據:

    • val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
      def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
        iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
      }
      複製代碼
  • 兩個分區中的元素:

  • e.g.:

    • 將每一個分區中的動物最多的個數求和
    • scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
      res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
      複製代碼
    • 將每種動物個數求和
    • scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
      res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
      複製代碼
    • 這個例子也可使用:reduceByKey
    • scala> pairRDD.reduceByKey(_+_).collect
      res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
      複製代碼

(4) coalesce與repartition

  • 都是將RDD中的分區進行重分區。
  • 區別:
    • coalesce默認不會進行shuffle(false);
    • repartition會進行shuffle(true),會將數據真正經過網絡進行重分區。
  • e.g.:
    • def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
         iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
      }
       
      val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
       
      下面兩句話是等價的:
      val rdd2 = rdd1.repartition(3)
      val rdd3 = rdd1.coalesce(3,true) -> 若是是false,查看RDD的length依然是2
      複製代碼

(5) 其餘高級算子

8. Spark 基礎編程案例

(1) 求網站的訪問量

  • Tomcat的訪問日誌以下:

  • 需求:找到訪問量最高的兩個網頁,要求顯示網頁名稱和訪問量

  • 步驟分析:

    • <1>. 對網頁的訪問量求和
    • <2>. 降序排序
  • 代碼:

    • import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      
      object TomcatLogCount {
        
        def main(args: Array[String]): Unit = {
          
          val conf = new SparkConf().setMaster("local").setAppName("TomcatLogCount")
          val sc = new SparkContext(conf)
          
          /*
           * 讀入日誌並解析
           * 
           * 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
           * */
          
          val rdd1 = sc.textFile(" ").map(
              line => {
                //解析字符串,獲得jsp的名字
                //1. 解析兩個引號間的字符串
                val index1 = line.indexOf("\"")
                val index2 = line.lastIndexOf("\"")
                //line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
                val line1 = line.substring(index1 + 1, index2)
                
                val index3 = line1.indexOf(" ")
                val index4 = line1.lastIndexOf(" ")
                //line2 = /MyDemoWeb/oracle.jsp
                val line2 = line1.substring(index3 + 1, index4)
                
                //獲得jsp的名字  oracle.jsp
                val jspName = line2.substring(line2.lastIndexOf("/"))
                
                (jspName, 1)
              }
              )
          //統計每一個jsp的次數
          val rdd2 = rdd1.reduceByKey(_+_)
          
          //使用Value排序
          val rdd3 = rdd2.sortBy(_._2, false)
          
          //獲得次數最多的兩個jsp
          rdd3.take(2).foreach(println)
        
          sc.stop()
        }
      }
      複製代碼

(2) 建立自定義分區

  • 根據jsp文件的名字,將各自的訪問日誌放入到不一樣的分區文件中,以下:
    • 生成的分區文件

    • 如:part-00000文件中的內容:只包含了web.jsp的訪問日誌

  • 代碼:
    • import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import scala.collection.mutable.HashMap
      
      
      object TomcatLogPartitioner {
        
        def main(args: Array[String]): Unit = {
          
          val conf = new SparkConf().setMaster("local").setAppName("TomcatLogPartitioner")
          val sc = new SparkContext(conf)
          
          /*
           * 讀入日誌並解析
           * 
           * 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
           * */
          
          val rdd1 = sc.textFile(" ").map(
              line => {
                //解析字符串,獲得jsp的名字
                //1. 解析兩個引號間的字符串
                val index1 = line.indexOf("\"")
                val index2 = line.lastIndexOf("\"")
                //line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
                val line1 = line.substring(index1 + 1, index2)
                
                val index3 = line1.indexOf(" ")
                val index4 = line1.lastIndexOf(" ")
                //line2 = /MyDemoWeb/oracle.jsp
                val line2 = line1.substring(index3 + 1, index4)
                
                //獲得jsp的名字  oracle.jsp
                val jspName = line2.substring(line2.lastIndexOf("/"))
                
                (jspName, line)
              }
              )
              
              //獲得不重複的jsp名字
              val rdd2 = rdd1.map(_._1).distinct().collect()
              
              //建立分區規則
              val wepPartitioner = new WepPartitioner(rdd2)
              val rdd3 = rdd1.partitionBy(wepPartitioner)
              
              //輸出rdd3
              rdd3.saveAsTextFile(" ")
          
        }
        
        //定義分區規則
        class WepPartitioner(jspList : Array[String]) extends Partitioner {
          
          /*
           * 定義集合來保存分區條件:
           * String 表明jsp的名字
           * Int 表明序號
           * */ 
           
          val partitionMap = new HashMap[String, Int]()
          //初始分區號
          val partID = 0
          //填值
          for (jsp <- jspList) {
            patitionMap.put(jsp, partID)
            partID += 1
          }
          
          //返回分區個數
          def numPartitioners : Int = partitionMap.size
          
          //根據jsp,返回對應的分區
            def getPartition(key : Any) : Int = partitionMap.getOrElse(key.toString(), 0)
            
        }
        
      }
      複製代碼

(3) 使用JDBCRDD 訪問數據庫

  • JdbcRDD參數說明:

  • 從上面的參數說明能夠看出,JdbcRDD有如下兩個缺點:

    • <1>. 執行的SQL必須有兩個參數,並類型都是Long
    • <2>. 獲得的結果是ResultSet,即:只支持select操做
  • 代碼:

    • import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import java.sql.Connection
      import java.sql.DriverManager
      import java.sql.PreparedStatement
      
      /*
       * 把Spark結果存放到mysql數據庫中
       *
       */
      
      object TomcatLogCountToMysql {
        def main(args: Array[String]): Unit = {
          //建立SparkContext
          val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogCountToMysql")
      
          val sc = new SparkContext(conf)
      
          /*
           *
           * 讀入日誌 解析:
           *
           * 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
           */
      
          val rdd1 = sc.textFile("H:\\tmp_files\\localhost_access_log.txt")
            .map(
              line => {
                //解析字符串,獲得jsp的名字
                //一、解析兩個引號之間的字符串
                val index1 = line.indexOf("\"")
                val index2 = line.lastIndexOf("\"")
                val line1 = line.substring(index1 + 1, index2) // GET /MyDemoWeb/oracle.jsp HTTP/1.1
      
                //獲得兩個空格的位置
                val index3 = line1.indexOf(" ")
                val index4 = line1.lastIndexOf(" ")
                val line2 = line1.substring(index3 + 1, index4) // /MyDemoWeb/oracle.jsp
      
                //獲得jsp的名字
                val jspName = line2.substring(line2.lastIndexOf("/")) // oracle.jsp
      
                (jspName, 1)
              })
      
          //
          //    try {
          //      /*
          //       * create table mydata(jsname varchar(50),countNumber Int)
          //       *
          //       * foreach  沒有返回值,在本需求中,只須要寫數據庫,不須要返回新的RDD,因此用foreach便可
          //       *
          //       *
          //       * 運行 Task not serializable
          //       */
          //      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
          //      pst = conn.prepareStatement("insert into mydata values(?,?)")
          //
          //      rdd1.foreach(f => {
          //        pst.setString(1, f._1)
          //        pst.setInt(2, f._2)
          //
          //        pst.executeUpdate()
          //      })
          //    } catch {
          //      case t: Throwable => t.printStackTrace()
          //    } finally {
          //      if (pst != null) pst.close()
          //      if (conn != null) conn.close()
          //    }
          //
          //    sc.stop()
          //    //存入數據庫
          //    var conn: Connection = null
          //    var pst: PreparedStatement = null
      
          //    //第一種修改方法
          //    /*
          //     * 修改思路:
          //     * conn pst 讓每個節點都是用到,須要在不一樣的節點上傳輸,實現sericalizable接口
          //     */
          //    try {
          //      rdd1.foreach(f => {
          //        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
          //        pst = conn.prepareStatement("insert into mydata values(?,?)")
          //
          //        pst.setString(1, f._1)
          //        pst.setInt(2, f._2)
          //
          //        pst.executeUpdate()
          //      })
          //    } catch {
          //      case t: Throwable => t.printStackTrace()
          //    } finally {
          //      if (pst != null) pst.close()
          //      if (conn != null) conn.close()
          //    }
          //
          //    sc.stop()
      
          /*
           * 第一種修改方式,功能上能夠實現,但每條數據都會建立鏈接,對數據庫形成很大壓力
           *
           * 針對分區來操做:一個分區,創建一個鏈接便可
           */
          rdd1.foreachPartition(saveToMysql)
          sc.stop()
      
        }
      
        def saveToMysql(it: Iterator[(String, Int)]) = {
          var conn: Connection = null
          var pst: PreparedStatement = null
      
          try {
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
            pst = conn.prepareStatement("insert into mydata values(?,?)")
      
            it.foreach(f => {
      
              pst.setString(1, f._1)
              pst.setInt(2, f._2)
      
              pst.executeUpdate()
            })
          } catch {
            case t: Throwable => t.printStackTrace()
          } finally {
            if (pst != null) pst.close()
            if (conn != null) conn.close()
          }
        }
      
      }
      複製代碼

      9. 認識 Spark SQL

(1) 什麼是Spark SQL

  • Spark SQL is Apache Spark's module for working with structured data.(Spark SQL 是spark 的一個模塊,用來處理 結構化的數據。<不能處理非結構化的數據>)
  • Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用。

(2) 爲何要學習Spark SQL

  • Hive是將HQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,可是MapReduce這種計算模型執行效率比較慢。因此Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快,同時Spark SQL也支持從Hive中讀取數據,Hive 2.x 執行引擎可使用Spark。

(3) Spark SQL的特色:

  • <1>. 容易集成
    • 不須要單獨安裝。
  • <2>. 統一的數據訪問方式
    • 結構化數據(JDBC、JSon、Hive、parquer文件)均可以做爲Spark SQL 的數據源。
      • 對接多種數據源,且使用方式相似。
  • <3>. 兼容Hive
    • 把Hive中的數據,讀取到Spark SQL中運行。
  • <4>. 支持標準的數據鏈接(JDBC)

10. Spark SQL 基礎

(1) 基本概念:Datasets和DataFrames

  • <1>. DataFrame

    • DataFrame是組織成命名列的數據集。它在概念上等同於關係數據庫中的表,但在底層具備更豐富的優化。DataFrames能夠從各類來源構建,

    • 例如:

      • 結構化數據文件
      • Hive中的表
      • 外部數據庫或現有RDDs
    • DataFrame API支持的語言有Scala,Java,Python和R。

    • 從上圖能夠看出,DataFrame多了數據的結構信息,即schema。RDD是分佈式的 Java對象的集合。DataFrame是分佈式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子之外,更重要的特色是提高執行效率、減小數據讀取以及執行計劃的優化。

  • <2>. Datasets

    • Dataset是數據的分佈式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象。它提供了RDD的優勢(強類型化,使用強大的lambda函數的能力)以及Spark SQL優化後的執行引擎的優勢。一個Dataset 能夠從JVM對象構造,而後使用函數轉換(map, flatMap,filter等)去操做。Dataset API 支持Scala和Java,Python不支持Dataset API。

(2) DataFrames

  • <1>. 建立 DataFrames

    • a. 經過Case Class建立DataFrames
      • ① 定義case class(至關於表的結構:Schema)
        • case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
        • 注意:因爲mgr和comm列中包含null值,簡單起見,將對應的case class類型定義爲String
      • ② 將HDFS上的數據讀入RDD,並將RDD與case Class關聯
        • val lines = sc.textFile("/XXXX/emp.csv").map(_.split(","))
      • ③ 將RDD轉換成DataFrames
        • val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
      • ④ 經過DataFrames查詢數據
        • val df1 = allEmp.toDF
        • df1.show
    • b. 使用SparkSession
      • 什麼是SparkSession
        • Apache Spark 2.0引入了SparkSession,其爲用戶提供了一個統一的切入點來使用Spark的各項功能,而且容許用戶經過它調用DataFrame和Dataset相關API來編寫Spark程序。最重要的是,它減小了用戶須要瞭解的一些概念,使得咱們能夠很容易地與Spark交互。
        • 在2.0版本以前,與Spark交互以前必須先建立SparkConf和SparkContext。然而在Spark 2.0中,咱們能夠經過SparkSession來實現一樣的功能,而不須要顯式地建立SparkConf, SparkContext 以及 SQLContext,由於這些對象已經封裝在SparkSession中。   - 建立StructType,來定義Schema結構信息
        • 注意:須要import org.apache.spark.sql.types._import org.apache.spark.sql.Row
        • import org.apache.spark.sql.types._
              
          		val myschema = StructType(
          		List(
          		StructField("empno",DataTypes.IntegerType),
          		StructField("ename",DataTypes.StringType),
          		StructField("job",DataTypes.StringType),
          		StructField("mgr",DataTypes.IntegerType),
          		StructField("hiredate",DataTypes.StringType),
          		StructField("sal",DataTypes.IntegerType),
          		StructField("comm",DataTypes.IntegerType),
          		StructField("deptno",DataTypes.IntegerType),
          		))
          		
          		val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
          	
          		import org.apache.spark.sql.Row
          		
          		val df2 = spark.createDataFrame(allEmp,myschema)
          複製代碼
    • c. 使用JSon文件來建立DataFame
      • val df3 = spark.read    讀文件,默認是Parquet文件
          	val df3 = spark.read.json("/XXXX/people.json")    讀json文件
          	
          	df3.show
          	
          	val df4 = spark.read.format("json").load("/XXXX/people.json")
        複製代碼
  • <2>. DataFrame 操做

    • DataFrame操做也稱爲無類型的Dataset操做

    • a. DSL語句

      • 查詢全部的員工姓名
      • 查詢全部的員工姓名和薪水,並給薪水加100塊錢
      • 查詢工資大於2000的員工
      • 求每一個部門的員工人數
      • 參考:spark.apache.org/docs/2.1.0/…
    • b. SQL語句

      • **注意:**不能直接執行SQL,須要生成一個視圖,再執行sql。
      • ① 將DataFrame註冊成表(視圖):df.createOrReplaceTempView("emp")
      • ② 執行查詢:
        • spark.sql("select * from emp").show
        • spark.sql("select * from emp where deptno=10").show
        • spark.sql("select deptno,sum(sal) from emp group by deptno").show

(3) Spark SQL 中的視圖

  • 視圖是一個虛表,不存儲數據。
  • 兩種類型:
    • <1>. 普通視圖(本地視圖)——createOrReplaceTempView

      • 只在當前Session中有效。
    • <2>. 全局視圖: ——createGlobalTempView

      • 在Spark SQL中,若是想擁有一個臨時的view,並想在不一樣的Session中共享,並且在application的運行週期內可用,那麼就須要建立一個全局的臨時view。並記得使用的時候加上global_temp做爲前綴來引用它,由於全局的臨時view是綁定到系統保留的數據庫global_temp上。
    • e.g.: ``` 建立一個新session,讀取不到emp視圖 spark.newSession.sql("select * from emp")

      如下兩種方式都可讀到 全局視圖 中的數據:
      df1.createGlobalTempView("emp1")
      spark.newSession.sql("select * from global_temp.emp1").show
      
      spark.sql("select * from global_temp.emp1").show
      複製代碼
      複製代碼

(4) 建立Datasets

  • DataFrame的引入,可讓Spark更好的處理結構數據的計算,但其中一個主要的問題是:缺少編譯時類型安全。爲了解決這個問題,Spark採用新的Dataset API (DataFrame API的類型擴展)。
  • Dataset是一個分佈式的數據收集器。這是在Spark1.6以後新加的一個接口,兼顧了RDD的優勢(強類型,可使用功能強大的lambda)以及Spark SQL的執行器高效性的優勢。因此能夠把DataFrames當作是一種特殊的Datasets,即:Dataset(Row)
  • 建立DataSet:
    • <1>. 使用序列
      • ① 定義case class:
        • case class MyData(a:Int,b:String)
      • ② 生成序列並建立DataSet:
        • val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
      • ③ 查看結果
        • ds.show
    • <2>. 使用JSON數據
      • ① 定義case class:
        • case class Person(name: String, gender: String)
      • ② 經過JSON數據生成DataFrame:
        • val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))
      • ③ 將DataFrame轉成DataSet:
        • df.as[Person].show
        • df.as[Person].collect
    • <3>. 使用HDFS數據
      • ① 讀取HDFS數據,並建立DataSet:
        • val linesDS = spark.read.text("hdfs://XXXX:9000/XXXX/data.txt").as[String]
      • ② 對DataSet進行操做:分詞後,查詢長度大於3的單詞
        • val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
          words.show
          words.collect
          複製代碼
      • ③ 執行WordCount程序
        • val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
          result.show
          排序:result.orderBy($"value").show
          複製代碼

(5) Datasets 的操做案例

  • <1>. 使用emp.json 生成DataFrame:
    • val empDF = spark.read.json("/XXXX/emp.json")
      查詢工資大於3000的員工
      empDF.where($"sal" >= 3000).show
      複製代碼
  • <2>. 建立case class:
    • case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
  • <3>. 生成DataSets並查詢數據:
    • val empDS = empDF.as[Emp]
      
       查詢工資大於3000的員工
       empDS.filter(_.sal > 3000).show
      
       查看10號部門的員工
       empDS.filter(_.deptno == 10).show
      複製代碼
  • <4>. 多表查詢:
    • a. 建立部門表:
      • val deptRDD=sc.textFile("/XXXX/dept.csv").map(_.split(","))
          case class Dept(deptno:Int,dname:String,loc:String)
          val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
        複製代碼
      複製代碼
    • b. 建立員工表:
      • case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
        val empRDD = sc.textFile("/XXXX/emp.csv").map(_.split(","))
        val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
        複製代碼
    • c. 執行多表查詢:等值連接
      • val result = deptDS.join(empDS,"deptno")
        
        另外一種寫法:注意有三個等號
        val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
        joinWith和join的區別是鏈接後的新Dataset的schema會不同
        複製代碼
  • <5>. 查看執行計劃:
    • result.explain

11. Spark SQL 進階

(1) 使用數據源

  • <1>. 什麼是parquet文件
    • Parquet是列式存儲格式的一種文件類型,列式存儲有如下的核心:
      • 能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。
      • 壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。
      • 只讀取須要的列,支持向量運算,可以獲取更好的掃描性能。
      • Parquet格式是Spark SQL的默認數據源,可經過spark.sql.sources.default配置
  • <2>. 使用Load/Save函數
    • load函數是加載數據,save是存儲數據。
    • e.g.:
      • 讀取 users.parquet 文件(Spark自帶的示例文件)
          val userDF = spark.read.load("/root/users.parquet")
          查看結構:
          userDF.printSchema
          查看內容:
          userDF.show
          
          讀取json文件:
          val userDF = spark.read.load("/root/emp.json") ——>報錯
          正確方法:
          val userDF = spark.read.format("json").load("/root/emp.json")
          val userDF = spark.read.json("/root/emp.json")
        
          保存parquet文件到本地路徑:
          userDF.select($"name",$"favorite_color").write.save("/root/parquet")
          
          讀取剛寫入的文件:
          val userDF1 = spark.read.load("/root/parquet/part-00000-888d505a-7d51-4a50-aaf5-2bbdb56e67a1.snappy.parquet") --> 不推薦
          
          生產:(直接讀目錄)
          val userDF2 = spark.read.load("/usr/local/tmp_files/parquet")
        複製代碼
    • 關於save函數:
      • 調用save函數的時候,能夠指定存儲模式,追加、覆蓋等等

      • 能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:

      • userDF2.write.save("/root/parquet") ——>報錯
          	
        save的時候覆蓋:
        userDF2.write.mode("overwrite").save("/root/parquet")
        將結果保存成表:
        userDF2.select($"name").write.saveAsTable("table1")
        查看數據:
        spark.sql("select * from table2").show
        
        也能夠進行分區、分桶等操做:partitionBy、bucketBy
        複製代碼
  • <3>. Parquet文件
    • Parquet是一個列格式並且用於多個數據處理系統中。Spark SQL提供支持對於Parquet文件的讀寫,也就是自動保存原始數據的schema。當寫Parquet文件時,全部的列被自動轉化爲nullable,由於兼容性的緣故。
    • e.g.:
      • 讀入json格式的數據,將其轉換成parquet格式,並建立相應的表來使用SQL進行查詢。(把數據讀進來,再寫出去,就是Parquet文件)
        
        讀入文件:
        val empDF = spark.read.json("/root/emp.json")
        
        寫出文件:
        empDF.write.mode("overwrite").save("/root/parquet")
        empDF.write.mode("overwrite").parquet("/root/parquet")
          	
        建表查詢:
        val emp1 = spark.read.parquet("/root/parquet")
        emp1.createOrReplaceTempView("emp1")
        spark.sql("select * from emp1").show
        複製代碼
  • <4>. Schema的合併:
    • Parquet支持Schema evolution(Schema演變,即:合併)。用戶能夠先定義一個簡單的Schema,而後逐漸的向Schema中增長列描述。經過這種方式,用戶能夠獲取多個有不一樣Schema但相互兼容的Parquet文件。
    • e.g.:
      • 經過RDD來建立DataFrame:
        val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF("single","double") ——>"single","double"  是表結構
        df1.show
        df1.write.mode("overwrite").save("/root/test_table/key=1")
        val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF("single","triple")
        df2.show
        df2.write.mode("overwrite").save("/root/test_table/key=2")
          	
        合併兩個部分:
        val df3 = spark.read.parquet("/root/tmp_files/test_table")
        val df3 = spark.read.option("mergeSchema",true).parquet("/root/tmp_files/test_table")
        複製代碼
  • <5>. JSON Datasets
    • Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集爲DataFrame格式。讀取JSON數據集方法爲SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換爲DataFrame。
    • 須要注意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自知足有效的JSON對象。若是用多行描述一個JSON對象,會致使讀取出錯。讀取JSON數據集示例以下:
    • 讀取Json文件,生成DataFrame:
      val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
      打印Schema結構信息:
      peopleDF.printSchema
      建立臨時視圖:
      peopleDF.createOrReplaceTempView("peopleView")
      執行查詢:
      spark.sql("select * from peopleView").show
      Spark SQL 支持統一的訪問接口。對於不一樣的數據源,讀取進來,生成DataFrame後,操做徹底同樣。
      複製代碼
  • <6>. 使用JDBC
    • Spark SQL一樣支持經過JDBC讀取其餘數據庫的數據做爲數據源。
    • Spark加載MySQL:
      • spark-shell --master spark://XXXX:7077 --jars /XXXX/.jar --driver-class-path /XXXX/.jar
    • Spark鏈接MySQL:
      • 方法一:
        • val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://XXXX:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","123456").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","emp").load
          
            mysqlDF.show```
          複製代碼
      • 方式二:定義一個Properties類
        • import java.util.Properties
            val mysqlProps = new Properties()
            mysqlProps.setProperty("user","root")
            mysqlProps.setProperty("password","123456")
            
            val mysqlDF1 = spark.read.jdbc("jdbc:mysql://XXXX:3306/company?serverTimezone=UTC&characterEncoding=utf-8","emp",mysqlProps)
            
            mysqlDF1.show	```
          複製代碼
  • <7>. 使用Hive Table
    • a. 準備工做:
      • 搭建好Hive的環境(須要Hadoop)
      • 配置Spark SQL支持Hive:
        • 將如下文件拷貝到$SPARK_HOME/conf的目錄下,便可
          • $HIVE_HOME/conf/hive-site.xml
          • $HADOOP_CONF_DIR/core-site.xml
          • $HADOOP_CONF_DIR/hdfs-site.xml
        • 重啓Spark
    • b. 使用Spark Shell操做Hive
      • 啓動Hadoop、Hive
      • 啓動Spark
        • 啓動spark-sql的時候,須要使用--jars指定mysql的驅動程序
      • 建立表:spark.sql("create table spark.emp1(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ','")
      • 導入數據:spark.sql("load data local inpath '/root/emp.csv' overwrite into table spark.emp1")
      • 查詢數據:spark.sql("select * from spark.emp1").show

(2) 在IDE中開發Spark SQL

  • <1>. 建立DataFrame
    • a. StructType方式
      • package Demo
        
          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.types.StructType
          import org.apache.spark.sql.types.StructField
          import org.apache.spark.sql.types.IntegerType
          import org.apache.spark.sql.types.StringType
          import org.apache.spark.sql.Row
          import org.apache.log4j.Logger
          import org.apache.log4j.Level
          
          /*建立DataFrame StructType方式*/
          object Demo01 {
          
            def main(args: Array[String]): Unit = {
          
              Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
          		Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
              
              // 建立Spark Session對象
              val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
          
              // 從指定的地址建立RDD對象
              /*1    Tom    12
          		 *2    Mary	  13
          		 *3    Lily	  15
               * */
              val personRDD = spark.sparkContext.textFile("/Users/apple/Documents/student.txt").map(_.split("\t"))
          
              // 經過StructType方式指定Schema
              val schema = StructType(
                List(
                  StructField("id", IntegerType),
                  StructField("name", StringType),
                  StructField("age", IntegerType)))
          
              // 將RDD映射到rowRDD上,映射到Schema上
              val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt))
              val personDataFrame = spark.createDataFrame(rowRDD, schema)
          
              // 註冊視圖
              personDataFrame.createOrReplaceTempView("t_person")
          
              //執行SQL語句
              val df = spark.sql("select * from t_person order by age desc")
          
              df.show()
        
          spark.stop()
        }
        複製代碼
      }
      複製代碼
    • b. case Class方式
      • package Demo
        
          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.types.StructType
          import org.apache.spark.sql.types.StructField
          import org.apache.spark.sql.types.IntegerType
          import org.apache.spark.sql.types.StringType
          import org.apache.spark.sql.Row
          import org.apache.log4j.Logger
          import org.apache.log4j.Level
          
          /*使用case Class來建立DataFrame*/
          object Demo02 {
          
            def main(args: Array[String]): Unit = {
              Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
              Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
          
              //建立Spark Session對象
              val spark = SparkSession.builder().master("local").appName("Demo2").getOrCreate()
          
              //從指定的地址建立RDD對象
              val lineRDD = spark.sparkContext.textFile("").map(_.split("\t"))
          
              //把數據與case class作匹配
              val studentRDD = lineRDD.map(x => Student(x(0).toInt, x(1), x(2).toInt))
          
              //生成DataFrame
              import spark.sqlContext.implicits._
              val studentDF = studentRDD.toDF
          
              //註冊視圖 執行SQL
              studentDF.createOrReplaceTempView("student")
          
              spark.sql("select * from student").show
          
              spark.stop()
            }
          }
          
          //定義case class
          case class Student(stuId: Int, stuName: String, stuAge: Int)
        複製代碼
  • <2>. 寫入MySQL
    • package Demo
      
      
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.types.StructType
      import org.apache.spark.sql.types.StructField
      import org.apache.spark.sql.types.IntegerType
      import org.apache.spark.sql.types.StringType
      import org.apache.spark.sql.Row
      import org.apache.log4j.Logger
      import org.apache.log4j.Level
      import java.util.Properties
      
      /*寫入MySQL*/
      object Demo03 {
        
        def main(args: Array[String]): Unit = {
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
      		Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      
          //建立Spark Session對象
          val spark = SparkSession.builder().master("local").appName("Demo3").getOrCreate()
      
          //從指定的地址建立RDD對象
          val lineRDD = spark.sparkContext.textFile("").map(_.split("\t"))
          
          //經過StructType方式指定Schema
          val schema = StructType(
            List(
              //字段與MySQL表中字段對應一致  
              StructField("personID", IntegerType),
              StructField("personName", StringType),
              StructField("personAge", IntegerType)))
              
         //將RDD映射到rowRDD上,映射到Schema上
         val rowRDD = lineRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
         val personDataFrame = spark.createDataFrame(rowRDD, schema)
         
         personDataFrame.createOrReplaceTempView("myperson")
         
         val result = spark.sql("select * from myperson")
         
         result.show()
         
         //把結果存入到mysql中
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "123456")
         //append追加模式
         result.write.mode("append").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)
         
         spark.stop()
        
        }
      }
      複製代碼
  • <3>. 使用Spark SQL 讀取Hive中的數據,將計算結果存入mysql
    • package Demo
      
      import org.apache.spark.sql.SparkSession
      import java.util.Properties
      
      /*使用Spark SQL 讀取Hive中的數據,將計算結果存入mysql*/
      //命令:./bin/spark-submit --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --class day0410.Demo4 /usr/local/tmp_files/Demo4.jar 
      object Demo4 {
        def main(args: Array[String]): Unit = {
      
          //建立SparkSession
          val spark = SparkSession.builder().appName("Demo4").enableHiveSupport().getOrCreate()
      
          //執行SQL
          val result = spark.sql("select deptno,count(1) from company.emp group by deptno")
      
          //將結果保存到mysql中
          val props = new Properties()
          props.setProperty("user", "root")
          props.setProperty("password", "123456")
      
          result.write.mode("append").jdbc("jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "emp_stat", props)
      
          spark.stop()
        }
      }
      複製代碼

(3) Spark SQL 性能優化

  • <1>. 在內存中緩存數據

    • 直接讀取內存的值來提升性能。
    • 經過spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")來從內存中去除table。
    • e.g.:
      • 操做mysql,啓動spark shell 時,須要:
          ./bin/spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar
          
          val mysqlDF = spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","emp").load
          
          mysqlDF.show
          mysqlDF.createOrReplaceTempView("emp")
          
          spark.sqlContext.cacheTable("emp")   ----> 標識這張表能夠被緩存,數據尚未真正被緩存
          spark.sql("select * from emp").show  ----> 依然讀取mysql
          spark.sql("select * from emp").show  ----> 從緩存中讀取數據
          
          spark.sqlContext.clearCache
          
          清空緩存後,執行查詢,會觸發查詢mysql數據庫。
        複製代碼
  • <2>. 性能優化相關參數

    • a. 將數據緩存到內存中的相關優化參數:

      • spark.sql.inMemoryColumnarStorage.compressed

        • 默認爲 true
        • Spark SQL 將會基於統計信息自動地爲每一列選擇一種壓縮編碼方式。
      • spark.sql.inMemoryColumnarStorage.batchSize

        • 默認值:10000
        • 緩存批處理大小。緩存數據時, 較大的批處理大小能夠提升內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。
    • b. 其餘性能相關的配置選項(不過不推薦手動修改,可能在後續版本自動的自適應修改)

      • spark.sql.files.maxPartitionBytes
        • 默認值:128 MB
        • 讀取文件時單個分區可容納的最大字節數
      • spark.sql.files.openCostInBytes
        • 默認值:4M
        • 打開文件的估算成本, 按照同一時間可以掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。
      • spark.sql.autoBroadcastJoinThreshold
        • 默認值:10M
        • 用於配置一個表在執行 join 操做時可以廣播給全部 worker 節點的最大字節大小。經過將這個值設置爲 -1 能夠禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
      • spark.sql.shuffle.partitions
        • 默認值:200
        • 用於配置 join 或聚合操做混洗(shuffle)數據時使用的分區數。

12. 認識 Spark Streaming

(1) Spark Streaming 簡介

  • 流式計算框架(相似於Storm)
  • 經常使用的實時計算引擎(流式計算)
    • <1>. Apache Storm:真正的流式計算
    • <2>. Spark Streaming :嚴格上來講不是真正的流式計算(實時計算),把連續的流式數據,當成不連續的RDD,本質是一個離散計算(不連續)
    • <3>. Apache Flink:真正的流式計算,與Spark Streaming相反, 把離散的數據,當成流式數據來處理
    • <4>. JStorm
  • Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.(易於構建靈活的、高容錯的流式系統)
  • Spark Streaming是核心Spark API的擴展,可實現可擴展、高吞吐量、可容錯的實時數據流處理。數據能夠從諸如Kafka,Flume,Kinesis或TCP套接字等衆多來源獲取,而且可使用由高級函數(如map,reduce,join和window)開發的複雜算法進行流數據處理。最後,處理後的數據能夠被推送到文件系統,數據庫和實時儀表板。並且,還能夠在數據流上應用Spark提供的機器學習和圖處理算法。

(2) Spark Streaming 的特色

  • <1>. 易用,已經集成到Spark中
  • <2>. 容錯性:底層RDD,RDD自己具備容錯機制
  • <3>. 支持多種語言:Java Scala Python

(3) Spark Streaming的內部結構

  • 在內部,它的工做原理以下。Spark Streaming接收實時輸入數據流,並將數據切分紅批,而後由Spark引擎對其進行處理,最後生成「批」形式的結果流。
  • Spark Streaming將連續的數據流抽象爲discretizedstream或DStream。在內部DStream 由一個RDD序列表示。

13. Spark Streaming 基礎

(1) Spark Streaming 官方示例

  • <1>. 介紹:
    • 向Spark Streaming中發送字符串,Spark 接收到之後進行計數
  • <2>. 準備工做:
    • netcat網絡工具(yum install nc.x86_64
    • **注意:**總核數大於等於2,一個核心用於接收數據,另外一個用於處理數據
  • <3>. 操做:
    • 啓動同一Linux系統的兩個窗口,一個負責輸入,一個負責監聽
    • 窗口1:nc -l 1234-l監聽模式;1234端口號)
    • 窗口2:run-example streaming.NetworkWordCount localhost 1234
    • 在窗口1輸入文本信息,窗口2監聽並進行計數統計

(2) 自寫 Spark Streaming 官方示例

  • MyNetworkWordCount.scala
    • /**
        *
        * @ClassName: MyNetworkWordCount
        * @Description
        * @Author: YBCarry
        * @Date2019-05-13 20:49
        * @Version: V1.0
        *
        **/
      import org.apache.spark.streaming.StreamingContext
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.Seconds
      import org.apache.spark.storage.StorageLevel
      import org.apache.log4j.Logger
      import org.apache.log4j.Level
      import org.apache.spark.internal.Logging
      /*
       * 自寫流式計算程序
       *
       * 知識點:
       * 一、建立一個StreamingContext對象  -->  核心:建立一個DStream
       * 二、DStream的表現形式:就是一個RDD
       * 三、使用DStream把連續的數據流變成不連續的RDD
       *
       * spark Streaming 最核心的內容
       */
      object MyNetworkWordCount {
        def main(args: Array[String]): Unit = {
      
      
          //建立一個Streaming Context對象
          //local[2] 表示開啓了兩個線程
          val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
          //Seconds(3) 表示採樣時間間隔
          val ssc = new StreamingContext(conf, Seconds(3))
      
          //建立DStream 從netcat服務器上接收數據
          val lines = ssc.socketTextStream("172.16.194.128", 1234, StorageLevel.MEMORY_ONLY)
      
          //lines中包含了netcat服務器發送過來的數據
          //分詞操做
          val words = lines.flatMap(_.split(" "))
      
          //計數
          val wordPair = words.transform(x => x.map(x => (x, 1)))
      
          //打印結果
          wordPair.print()
      
          //啓動StreamingContext 進行計算
          ssc.start()
      
          //等待任務結束
          ssc.awaitTermination()
      
        }
      }
      複製代碼

14. Spark Streaming 進階

(1) StreamingContext對象詳解

  • 初始化StreamingContext:
    • 方式一:從SparkConf對象中建立:
    • 方式二:從一個現有的SparkContext實例中建立
  • 程序中的幾點說明:
    • appName參數是應用程序在集羣UI上顯示的名稱。
    • master是Spark,Mesos或YARN集羣的URL,或者一個特殊的「local [*]」字符串來讓程序以本地模式運行。
    • 當在集羣上運行程序時,不須要在程序中硬編碼master參數,而是使用spark-submit提交應用程序並將master的URL以腳本參數的形式傳入。可是,對於本地測試和單元測試,您能夠經過「local[*]」來運行Spark Streaming程序(請確保本地系統中的cpu核心數夠用)。
    • StreamingContext會內在的建立一個SparkContext的實例(全部Spark功能的起始點),你能夠經過ssc.sparkContext訪問到這個實例。
    • 批處理的時間窗口長度必須根據應用程序的延遲要求和可用的集羣資源進行設置。
  • 注意:
    • 一旦一個StreamingContextt開始運做,就不能設置或添加新的流計算。
    • 一旦一個上下文被中止,它將沒法從新啓動。
    • 同一時刻,一個JVM中只能有一個StreamingContext處於活動狀態。
    • StreamingContext上的stop()方法也會中止SparkContext。 要僅中止StreamingContext(保持SparkContext活躍),請將stop() 方法的可選參數stopSparkContext設置爲false。
    • 只要前一個StreamingContext在下一個StreamingContext被建立以前中止(不中止SparkContext),SparkContext就能夠被重用來建立多個StreamingContext。

(2) 離散流(DStreams):Discretized Streams

  • 把連續的數據變成不連續的RDD
  • 由於DStream的特性,致使,Spark Streaming不是真正的流式計算
  • DiscretizedStream或DStream 是Spark Streaming對流式數據的基本抽象。它表示連續的數據流,這些連續的數據流能夠是從數據源接收的輸入數據流,也能夠是經過對輸入數據流執行轉換操做而生成的經處理的數據流。在內部,DStream由一系列連續的RDD表示,以下圖:
  • 舉例分析:在以前的NetworkWordCount的例子中,咱們將一行行文本組成的流轉換爲單詞流,具體作法爲:將flatMap操做應用於名爲lines的DStream中的每一個RDD上,以生成words DStream的RDD。以下圖所示:
  • 可是DStream和RDD也有區別,下面畫圖說明:

(3) 轉換操做(transformation)

  • transform(func)

    • 經過RDD-to-RDD函數做用於源DStream中的各個RDD,能夠是任意的RDD操做,從而返回一個新的RDD
    • 舉例:在NetworkWordCount中,也可使用transform來生成元組對
  • updateStateByKey(func)

    • 操做容許不斷用新信息更新它的同時保持任意狀態。
    • 定義狀態:狀態能夠是任何的數據類型
    • 定義狀態更新函數:怎樣利用更新前的狀態和從輸入流裏面獲取的新值更新狀態
  • 重寫NetworkWordCount程序,累計每一個單詞出現的頻率(注意:累計)

    • TotalNetworkWordCount.scala
    • package test.Network
      
          import org.apache.log4j.{Level, Logger}
          import org.apache.spark.SparkConf
          import org.apache.spark.storage.StorageLevel
          import org.apache.spark.streaming.{Seconds, StreamingContext}
          
          /**
            *
            * @ClassName: TotalNetworkWordCount
            * @Description: 實現累加操做
            * @Author: YBCarry
            * @Date2019-05-15 16:05
            * @Version: V1.0
            *
            **/
          object TotalNetworkWordCount {
          
            def main(args: Array[String]): Unit = {
          
              Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
              Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
          
              //建立一個Streaming Context對象
              //local[2] 表示開啓了兩個線程
              val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
              //Seconds(3) 表示採樣時間間隔
              val ssc = new StreamingContext(conf, Seconds(3))
          
              //設置檢查點目錄,保存以前都的狀態信息
              ssc.checkpoint("")
          
              //建立DStream
              val lines = ssc.socketTextStream("bigdata01", 1234, StorageLevel.MEMORY_ONLY)
          
              //分割
              val words = lines.flatMap(_.split(" "))
          
              //計數
        //    val wordPair = words.map((_, 1))
              val wordPair = words.transform( x => x.map(x => (x, 1)))
          
              //定義一個值函數 ;累加計數
              /*
              * 接收兩個參數
              * currentValues —— 當前值
              * previousValue ——歷史值
              * */
              val addFunc = (currentValues : Seq[Int], previousValues : Option[Int]) => {
          
                //累加當前的序列
                val currrentTotal = currentValues.sum
                //累加歷史值
                Some(currrentTotal + previousValues.getOrElse(0))
              }
          
              //累加運算
              val total = wordPair.updateStateByKey(addFunc)
          
              total.print()
          
              ssc.start()
      
          ssc.awaitTermination()
        }
      }
      複製代碼
    複製代碼

(4) 窗口操做

  • Spark Streaming還提供了窗口計算功能,容許在數據的滑動窗口上應用轉換操做。下圖說明了滑動窗口的工做方式:

  • 如圖所示,每當窗口滑過originalDStream時,落在窗口內的源RDD被組合並被執行操做以產生windowed DStream的RDD。在上面的例子中,操做應用於最近3個時間單位的數據,並以2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。

    • 窗口長度(windowlength) - 窗口的時間長度(上圖的示例中爲:3)。
    • 滑動間隔(slidinginterval) - 兩次相鄰的窗口操做的間隔(即每次滑動的時間長度)(上圖示例中爲:2)。
    • 這兩個參數必須是源DStream的批間隔的倍數(上圖示例中爲:1)。
  • e.g.: 假設對以前的單詞計數的示例進行擴展,每10秒鐘對過去30秒的數據進行wordcount。則在最近30秒的pairs DStream數據中對(word, 1)鍵值對應用reduceByKey操做。這是經過使用reduceByKeyAndWindow操做完成的。

  • package test.NetworkByWindow
    
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.SparkConf
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      /**
        *
        * @ClassName: NetworkWordCountByWindow
        * @Description: 每10秒讀取過去30秒的數據
        * @Author: YBCarry
        * @Date2019-05-15 17:00
        * @Version: V1.0
        *
        **/
      object NetworkWordCountByWindow {
      
        def main(args: Array[String]): Unit = {
      
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      
          //建立一個Streaming Context對象
          //local[2] 表示開啓了兩個線程
          val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
          //Seconds(3) 表示採樣時間間隔
          val ssc = new StreamingContext(conf, Seconds(3))
      
          //設置檢查點目錄,保存以前都的狀態信息
          ssc.checkpoint("")
      
          //建立DStream
          val lines = ssc.socketTextStream("bigdata01", 1234, StorageLevel.MEMORY_ONLY)
      
          //分割 每一個單詞計數
          val words = lines.flatMap(_.split(" ")).map((_, 1))
      
          /*
          * 窗口操做
          * 參數說明:要進行的操做  窗口的大小(30s)  窗口移動距離(12s) ——> 採樣時間(3)的整數倍 
          * */
          val result = words.reduceByKeyAndWindow((x : Int, y : Int) => (x + y), Seconds(30), Seconds(12))
        }
      
      }
    複製代碼

15. Spark 數據源

(1) 輸入DStreams和接收器

  • 輸入DStreams表示從數據源獲取輸入數據流的DStreams。在NetworkWordCount例子中,lines表示輸入DStream,它表明從netcat服務器獲取的數據流。每個輸入流DStream和一個Receiver對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。
  • 輸入DStreams表示從數據源獲取的原始數據流。Spark Streaming擁有兩類數據源:
    • 基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統、套接字鏈接、Akka的actor等
    • 高級源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。
  • 下面經過具體的案例,詳細說明:

(2) 基本源

  • <1>. 文件流:經過監控文件系統的變化,如有新文件添加,則將它讀入並做爲數據流

    • 注意:
      • ① 這些文件具備相同的格式
      • ② 這些文件經過原子移動或重命名文件的方式在dataDirectory建立
      • ③ 若是在文件中追加內容,這些追加的新數據也不會被讀取。
    • Spark Streaming監控一個文件夾,若是有變化,則把變化採集過來
    • import org.apache.log4j.{Level, Logger}
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      /**
        *
        * @ClassName: FileStreaming
        * @Description
        * @Author: YBCarry
        * @Date2019-05-16 09:24
        * @Version: V1.0
        *
        **/
      object FileStreaming {
      
        def main(args: Array[String]): Unit = {
      
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      
          //建立一個Streaming Context對象
          //local[2] 表示開啓了兩個線程
          val conf = new SparkConf().setAppName("MyFileStreaming").setMaster("local[2]")
          //Seconds(3) 表示採樣時間間隔
          val ssc = new StreamingContext(conf, Seconds(10))
      
          //監控目錄,讀取產生的新文件
          val lines = ssc.textFileStream("\\Users\\apple\\學習\\SparkFiles")
      
          lines.print()
      
          ssc.start()
          ssc.awaitTermination()
      
        }
      
      }
      複製代碼
    • 注意:須要在原文件中編輯,而後拷貝一份。
  • <2>. RDD隊列流

    • 使用streamingContext.queueStream(queueOfRDD)建立基於RDD隊列的DStream,用於調試Spark Streaming應用程序。
    • package test.RDDQueue
      
      
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.SparkConf
      import org.apache.spark.rdd.RDD
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      import scala.collection.mutable.Queue
      
      /**
        *
        * @ClassName: RDDQueueStream
        * @Description: RDD隊列流
        * @Author: YBCarry
        * @Date2019-05-16 10:48
        * @Version: V1.0
        *
        **/
      object RDDQueueStream {
      
        def main(args: Array[String]): Unit = {
      
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      
          //建立一個Streaming Context對象
          //local[2] 表示開啓了兩個線程
          val conf = new SparkConf().setAppName("MyRDDQueueStream").setMaster("local[2]")
          //Seconds(3) 表示採樣時間間隔
          val ssc = new StreamingContext(conf, Seconds(3))
      
          //建立隊列 RDD[Int]
          val rddQueue = new Queue[RDD[Int]]()
      
          //向隊列裏添加數據 (建立數據源)
          for (i <- 1 to 3) {
      
            rddQueue += ssc.sparkContext.makeRDD(1 to 10)
      
            //便於觀察
            Thread.sleep(1000)
          }
      
          //從隊列中接收數據,建立DStream
          val inputDStream = ssc.queueStream(rddQueue)
      
          //處理數據
          val result = inputDStream.map(x => (x, x * 2))
          result.print()
      
          ssc.start()
          ssc.awaitTermination()
      
        }
      
      }
      複製代碼
  • <3>. 套接字流:經過監聽Socket端口來接收數據

(3) 高級源

  • <1>. Spark Streaming接收Flume數據
    • a. 基於Flume的Push模式:
      • Flume被用於在Flume agents之間推送數據,在這種方式下,Spark Streaming能夠很方便的創建一個receiver,起到一個Avro agent的做用。Flume能夠將數據推送到改receiver。
    • 如下爲配置步驟:
      • **第一步:**Flume的配置文件
        • MyFlumeStream01.conf
        • #定義agent名, source、channel、sink的名稱
          a4.sources = r1
          a4.channels = c1
          a4.sinks = k1
          
          #具體定義source
          a4.sources.r1.type = spooldir
          a4.sources.r1.spoolDir = /usr/local/tmp_files/logs
          
          #具體定義channel
          a4.channels.c1.type = memory
          a4.channels.c1.capacity = 10000
          a4.channels.c1.transactionCapacity = 100
          
          #具體定義sink
          a4.sinks = k1
          a4.sinks.k1.type = avro
          a4.sinks.k1.channel = c1
          a4.sinks.k1.hostname = bigdata01
          a4.sinks.k1.port = 1234
          
          #組裝source、channel、sink
          a4.sources.r1.channels = c1
          a4.sinks.k1.channel = c1
          複製代碼
      • **第二步:**Spark Streaming程序
        • package test.Flume
          
          import org.apache.log4j.{Level, Logger}
          import org.apache.spark.SparkConf
          import org.apache.spark.streaming.flume.FlumeUtils
          import org.apache.spark.streaming.{Seconds, StreamingContext}
          
          /**
            *
            * @ClassName: MyFlumeStream
            * @Description: flume將數據推送給Spark Streaming 使用push
            * @Author: YBCarry
            * @Date2019-05-16 14:01
            * @Version: V1.0
            *
            **/
          object MyFlumeStream01 {
          
            def main(args: Array[String]): Unit = {
          
              Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
              Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
          
              //建立一個Streaming Context對象
              //local[2] 表示開啓了兩個線程
              val conf = new SparkConf().setAppName("MyRDDQueueStream").setMaster("local[2]")
              //Seconds(3) 表示採樣時間間隔
              val ssc = new StreamingContext(conf, Seconds(3))
          
              //對接Flume
              //建立一個Flumeevent從flume中接收puch來的數據(也是DStream)
              //flume將數據push到localhost:1234,Spark Stream在這裏監聽
              val flumeEventDStream = FlumeUtils.createStream(ssc, "bigdata01", 1234)
          
              //將Flumeevent中的事件轉換成字符串
              val lineDStream = flumeEventDStream.map(e => {
                new String(e.event.getBody.array)
              })
          
              //輸出結果
              lineDStream.print()
          
              ssc.start()
              ssc.awaitTermination()
          
             }
          }
          複製代碼
      • **第三步:**測試
        • 啓動Flume
          • flume-ng agent -n a4 -f Spark/MyFlumeStream01.conf -c conf -Dflume.root.logger=INFO,console
        • 啓動Spark Streaming程序
        • 拷貝日誌文件到/root/training/logs目錄
        • 觀察輸出,採集到數據
    • b. 基於Custom Sink的Pull模式
      • 不一樣於Flume直接將數據推送到Spark Streaming中,第二種模式經過如下條件運行一個正常的Flume sink。Flume將數據推送到sink中,而且數據保持buffered狀態。Spark Streaming使用一個可靠的Flume接收器和轉換器從sink拉取數據。只要當數據被接收而且被Spark Streaming備份後,轉換器才運行成功。
      • 這樣,與第一種模式相比,保證了很好的健壯性和容錯能力,這種模式須要爲Flume配置一個正常的sink。
      • 如下爲配置步驟:
        • **第一步:**Flume的配置文件

          • FlumeLogPull.conf
          • a1.channels = c1
              a1.sinks = k1
              a1.sources = r1
              
              a1.sources.r1.type = spooldir
              a1.sources.r1.spoolDir = /usr/local/tmp_files/logs
              
              a1.channels.c1.type = memory
              a1.channels.c1.capacity = 100000
              a1.channels.c1.transactionCapacity = 100000
              
              a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
              a1.sinks.k1.channel = c1
              a1.sinks.k1.hostname = bigdata01
              a1.sinks.k1.port = 1234
              
              #組裝source、channel、sink
              a1.sources.r1.channels = c1
              a1.sinks.k1.channel = c1
            
            複製代碼
        • **第二步:**Spark Streaming程序

          • 複製代碼

          package test.Flume

          import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.flume.FlumeUtils

          /** *

          • @ClassName: FlumePutSink
          • @Description: 測試pull方式 使用Spark sink
          • @Author: YBCarry
          • @Date2019-05-16 15:23
          • @Version: V1.0

          **/ object FlumeLogPull {

          def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //建立一個Streaming Context對象 //local[2] 表示開啓了兩個線程 val conf = new SparkConf().setAppName("FlumeLogPull").setMaster("local[2]") //Seconds(3) 表示採樣時間間隔 val ssc = new StreamingContext(conf,Seconds(3))

          //建立FlumeEvent的DStream,採用pull的方式
          val flumeEvent = FlumeUtils.createPollingStream(ssc, "172.16.194.128",1234, StorageLevel.MEMORY_ONLY)
          
          //將FlumeEvent的事件準換成字符串
          val lineDStream = flumeEvent.map( e => {
            new String(e.event.getBody.array)
          })
          
          //輸出結果
          lineDStream.print()
          
          ssc.start()
          ssc.awaitTermination()
          }
          複製代碼

          }

          複製代碼
        • **第三步:**須要的jar包

          • spark-streaming-flume-sink_2.11-2.1.0.jar拷貝到Flume的lib目錄下。
        • **第四步:**測試

          • 啓動Flume
          • 啓動Spark Streaming程序
          • 將測試數據拷貝到/root/training/logs
          • 觀察輸出

16. Spark 性能優化

(1) 概述

  • Spark的計算本質是分佈式計算,因此,Spark程序的性能可能由於集羣中的任何因素出現瓶頸:CPU、網絡帶寬、或者內存。若是在持久化RDD的時候,持久化了大量的數據,那麼Java虛擬機的垃圾回收就可能成爲一個瓶頸。Java虛擬機會按期進行垃圾回收,此時會追蹤全部Java對象,而且在垃圾回收時,找到那些已經再也不使用的對象。
  • 核心:清理舊對象,給新對象騰出空間。垃圾回收的性能開銷,是與內存中的對象數量成正比。

(2) spark內存分配

(3) Spark GC原理

(4) 減小批數據的執行時間

  • 在Spark中有幾個優化能夠減小批處理的時間:
    • <1>. 減小批數據的執行時間
      • 在Spark中有幾個優化能夠減小批處理的時間:
        • ① 數據接收的並行水平
          • 經過網絡(如kafka,flume,socket等)接收數據須要這些數據反序列化並被保存到Spark中。若是數據接收成爲系統的瓶頸,就要考慮並行地接收數據。注意,每一個輸入DStream建立一個receiver(運行在worker機器上)接收單個數據流。建立多個輸入DStream並配置它們能夠從源中接收不一樣分區的數據流,從而實現多數據流接收。例如,接收兩個topic數據的單個輸入DStream能夠被切分爲兩個kafka輸入流,每一個接收一個topic。這將在兩個worker上運行兩個receiver,所以容許數據並行接收,提升總體的吞吐量。多個DStream能夠被合併生成單個DStream,這樣運用在單個輸入DStream的transformation操做能夠運用在合併的DStream上。
        • ② 數據處理的並行水平
          • 若是運行在計算stage上的併發任務數不足夠大,就不會充分利用集羣的資源。默認的併發任務數經過配置屬性來肯定spark.default.parallelism。
        • ③ 數據序列化
          • 能夠經過改變序列化格式來減小數據序列化的開銷。在流式傳輸的狀況下,有兩種類型的數據會被序列化:
            • 輸入數據
            • 由流操做生成的持久RDD
          • 在上述兩種狀況下,使用Kryo序列化格式能夠減小CPU和內存開銷。

(5) 設置正確的批容量

  • 爲了Spark Streaming應用程序可以在集羣中穩定運行,系統應該可以以足夠的速度處理接收的數據(即處理速度應該大於或等於接收數據的速度)。這能夠經過流的網絡UI觀察獲得。批處理時間應該小於批間隔時間。
  • 根據流計算的性質,批間隔時間可能顯著的影響數據處理速率,這個速率能夠經過應用程序維持。能夠考慮WordCountNetwork這個例子,對於一個特定的數據處理速率,系統可能能夠每2秒打印一次單詞計數(批間隔時間爲2秒),但沒法每500毫秒打印一次單詞計數。因此,爲了在生產環境中維持指望的數據處理速率,就應該設置合適的批間隔時間(即批數據的容量)。
  • 找出正確的批容量的一個好的辦法是用一個保守的批間隔時間(5-10,秒)和低數據速率來測試你的應用程序。

(6) 內存調優

  • 介紹幾個比較推薦的自定義選項,它們能夠減小Spark Streaming應用程序垃圾回收的相關暫停,得到更穩定的批處理時間。
    • **Default persistence level of DStreams:**和RDDs不一樣的是,默認的持久化級別是序列化數據到內存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即便保存數據爲序列化形態會增長序列化/反序列化的開銷,可是能夠明顯的減小垃圾回收的暫停。
    • **Clearing persistent RDDs:**默認狀況下,經過Spark內置策略(LUR),Spark Streaming生成的持久化RDD將會從內存中清理掉。若是spark.cleaner.ttl已經設置了,比這個時間存在更老的持久化RDD將會被定時的清理掉。正如前面提到的那樣,這個值須要根據Spark Streaming應用程序的操做當心設置。然而,能夠設置配置選項spark.streaming.unpersist爲true來更智能的去持久化(unpersist)RDD。這個配置使系統找出那些不須要常常保有的RDD,而後去持久化它們。這能夠減小Spark RDD的內存使用,也可能改善垃圾回收的行爲。
    • **Concurrent garbage collector:**使用併發的標記-清除垃圾回收能夠進一步減小垃圾回收的暫停時間。儘管併發的垃圾回收會減小系統的總體吞吐量,可是仍然推薦使用它以得到更穩定的批處理時間。

(7) shuffle原理

  • <1>. 優化前
  • <2>. 優化後

17. Spark MLlib庫

(1) 概述

  • MLlib is Apache Spark's scalable machine learning library.(MLlib 是 Spark 支持 Scala 的能夠擴展的機器學習庫。)
  • Spark在機器學習方面具備得天獨厚的有事,有如下幾個緣由:
    • <1>. 機器學習算法通常都有多個步驟迭代計算,須要在屢次迭代後,得到足夠小的偏差或者收斂纔會中止。
      • e.g.:
      • double wucha = 1.0
          while ( wucha >= 0.00001 ) {
        	  建模  wucha -= 某個值
          }
          
          模型計算完畢
        複製代碼
      • 當迭代使用Hadoop的MapReduce計算框架時,每次都要讀寫硬盤以及任務啓動工做,致使很大的IO開銷。而Spark基於內存的計算模型天生擅長迭代計算,只有在必要時,纔會讀寫硬盤,因此Spark是機器學習比較理想的平臺。
    • <2>. 通訊角度
      • Hadoop的MapReduce計算框架經過heartbeat方式來進行通訊和傳遞數據,執行速度慢。
        • Spark有高效的Akka和Netty通訊系統,通訊效率高。
  • SPark MLlib 是Spark 對經常使用的機器學習算法的實現庫,同時包括相關測試和數據生成器。

(2) 什麼是機器學習

  • <1>. 定義
    • A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P, if its performance at tasks in T, as measured by P, improves with experience E。
    • 機器學習(Machine Learning, ML)是一門多領域交叉學科,涉及機率論、統計學、逼近論、凸分析、算法複雜度理論等多門學科。專門研究計算機怎樣模擬或實現人類的學習行爲,以獲取新的知識或技能,從新組織已有的知識結構使之不斷改善自身的性能。(經過算法使計算機可以模擬人類的判別能力)
    • **三個關鍵詞:**算法、經驗、模型評價
    • **應用:**金融反欺詐、語音識別、天然語言處理、翻譯、模式識別、智能控制等等。
  • <2>. 機器學習工做流程
    • 在數據的基礎上,經過算法構建出模型,並進行評價
      • 若是達到要求,則用該模型測試其餘數據
      • 若是不達到要求,要調整算法來從新創建模型,再次進行評估
      • 循環往復,直到得到滿意的經驗
  • <3>. 基於大數據的機器學習
    • 傳統的機器學習算法,因爲技術和單機存儲的限制,依賴於數據抽樣,只能在少許數據上使用。因此存在的問題是很難作好隨機,從而致使學習的模型不許確。
    • 在大數據上進行機器學習,能夠直接處理全量數據並進行大量迭代計算。Spark自己計算優點,適合機器學習。此外spark-shell、pyspark均可以提供及時查詢工具。

(3) MLlib

  • MLlib是Spark機器學習庫,簡化機器學習的工程實踐工做,方便擴展到更大規模。集成了通用的學習算法:分類、迴歸、聚類、協同過濾、降維等等。另外,MLlib自己在Spark中,數據清洗、SQL、建模放在一塊兒。
相關文章
相關標籤/搜索