Update(Stage4):Structured Streaming_介紹_案例

 

  • 1. 回顧和展望
    • 1.1. Spark 編程模型的進化過程
    • 1.2. Spark 的 序列化 的進化過程
    • 1.3. Spark Streaming 和 Structured Streaming
  • 2. Structured Streaming 入門案例
    • 2.1. 需求梳理
    • 2.2. 代碼實現
    • 2.3. 運行和結果驗證
  • 3. Stuctured Streaming 的體系和結構
    • 3.1. 無限擴展的表格
    • 3.2. 體系結構
  • 4. Source
    • 4.1. 從 HDFS 中讀取數據
    • 4.2. 從 Kafka 中讀取數據
  • 5. Sink
    • 5.1. HDFS Sink
    • 5.2. Kafka Sink
    • 5.3. Foreach Writer
    • 5.4. 自定義 Sink
    • 5.5. Tigger
    • 5.6. 從 Source 到 Sink 的流程
    • 5.7. 錯誤恢復和容錯語義
  • 6. 有狀態算子
    • 6.1. 常規算子
    • 6.2. 分組算子
全天目標
  1. 回顧和展望前端

  2. 入門案例java

  3. Stuctured Streaming 的體系和結構node

 

1. 回顧和展望

本章目標

Structured Streaming 是 Spark Streaming 的進化版, 若是瞭解了 Spark 的各方面的進化過程, 有助於理解 Structured Streaming 的使命和做用python

本章過程
  1. Spark 的 API 進化過程mysql

  2. Spark 的序列化進化過程算法

  3. Spark Streaming 和 Structured Streamingsql

1.1. Spark 編程模型的進化過程

目標和過程
目標

Spark 的進化過程當中, 一個很是重要的組成部分就是編程模型的進化, 經過編程模型能夠看得出來內在的問題和解決方案shell

過程
  1. 編程模型 RDD 的優勢和缺陷apache

  2. 編程模型 DataFrame 的優勢和缺陷編程

  3. 編程模型 Dataset 的優勢和缺陷

20190625103618
編程模型 解釋

RDD

rdd.flatMap(_.split(" "))
   .map((_, 1))
   .reduceByKey(_ + _)
   .collect
  • 針對自定義數據對象進行處理, 能夠處理任意類型的對象, 比較符合面向對象

  • RDD 沒法感知到數據的結構, 沒法針對數據結構進行編程

DataFrame

spark.read
     .csv("...")
     .where($"name" =!= "")
     .groupBy($"name")
     .show()
  • DataFrame 保留有數據的元信息, API 針對數據的結構進行處理, 例如說能夠根據數據的某一列進行排序或者分組

  • DataFrame 在執行的時候會通過 Catalyst 進行優化, 而且序列化更加高效, 性能會更好

  • DataFrame 只能處理結構化的數據, 沒法處理非結構化的數據, 由於 DataFrame 的內部使用 Row 對象保存數據

  • Spark 爲 DataFrame 設計了新的數據讀寫框架, 更增強大, 支持的數據源衆多

Dataset

spark.read
     .csv("...")
     .as[Person]
     .where(_.name != "")
     .groupByKey(_.name)
     .count()
     .show()
  • Dataset 結合了 RDD 和 DataFrame 的特色, 從 API 上便可以處理結構化數據, 也能夠處理非結構化數據

  • Dataset 和 DataFrame 實際上是一個東西, 因此 DataFrame 的性能優點, 在 Dataset 上也有

總結
RDD 的優勢
  1. 面向對象的操做方式

  2. 能夠處理任何類型的數據

RDD 的缺點
  1. 運行速度比較慢, 執行過程沒有優化

  2. API 比較僵硬, 對結構化數據的訪問和操做沒有優化

DataFrame 的優勢
  1. 針對結構化數據高度優化, 能夠經過列名訪問和轉換數據

  2. 增長 Catalyst 優化器, 執行過程是優化的, 避免了由於開發者的緣由影響效率

DataFrame 的缺點
  1. 只能操做結構化數據

  2. 只有無類型的 API, 也就是隻能針對列和 SQL 操做數據, API 依然僵硬

Dataset 的優勢
  1. 結合了 RDD 和 DataFrame 的 API, 既能夠操做結構化數據, 也能夠操做非結構化數據

  2. 既有有類型的 API 也有無類型的 API, 靈活選擇

1.2. Spark 的 序列化 的進化過程

目標和過程
目標

Spark 中的序列化過程決定了數據如何存儲, 是性能優化一個很是重要的着眼點, Spark 的進化並不僅是針對編程模型提供的 API, 在大數據處理中, 也必需要考慮性能

過程
  1. 序列化和反序列化是什麼

  2. Spark 中什麼地方用到序列化和反序列化

  3. RDD 的序列化和反序列化如何實現

  4. Dataset 的序列化和反序列化如何實現

Step 1: 什麼是序列化和序列化

在 Java 中, 序列化的代碼大概以下

public class JavaSerializable implements Serializable { NonSerializable ns = new NonSerializable(); } public class NonSerializable { } public static void main(String[] args) throws IOException { // 序列化 JavaSerializable serializable = new JavaSerializable(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream("/tmp/obj.ser")); // 這裏會拋出一個 "java.io.NotSerializableException: cn.itcast.NonSerializable" 異常 objectOutputStream.writeObject(serializable); objectOutputStream.flush(); objectOutputStream.close(); // 反序列化 FileInputStream fileInputStream = new FileInputStream("/tmp/obj.ser"); ObjectInputStream objectOutputStream = new ObjectInputStream(fileInputStream); JavaSerializable serializable1 = objectOutputStream.readObject(); }
序列化是什麼
  • 序列化的做用就是能夠將對象的內容變成二進制, 存入文件中保存

  • 反序列化指的是將保存下來的二進制對象數據恢復成對象

序列化對對象的要求
  • 對象必須實現 Serializable 接口

  • 對象中的全部屬性必須都要能夠被序列化, 若是出現沒法被序列化的屬性, 則序列化失敗

限制
  • 對象被序列化後, 生成的二進制文件中, 包含了不少環境信息, 如對象頭, 對象中的屬性字段等, 因此內容相對較大

  • 由於數據量大, 因此序列化和反序列化的過程比較慢

序列化的應用場景
  • 持久化對象數據

  • 網絡中不能傳輸 Java 對象, 只能將其序列化後傳輸二進制數據

Step 2: 在  Spark 中的序列化和反序列化的應用場景
  • Task 分發

    20190627194356

    Task 是一個對象, 想在網絡中傳輸對象就必需要先序列化

  • RDD 緩存

    val rdd1 = rdd.flatMap(_.split(" "))
       .map((_, 1))
       .reduceByKey(_ + _)
    
    rdd1.cache
    
    rdd1.collect
    • RDD 中處理的是對象, 例如說字符串, Person 對象等

    • 若是緩存 RDD 中的數據, 就須要緩存這些對象

    • 對象是不能存在文件中的, 必需要將對象序列化後, 將二進制數據存入文件

  • 廣播變量

    20190627195544
    • 廣播變量會分發到不一樣的機器上, 這個過程當中須要使用網絡, 對象在網絡中傳輸就必須先被序列化

  • Shuffle 過程

    20190627200225
    • Shuffle 過程是由 Reducer 從 Mapper 中拉取數據, 這裏面涉及到兩個須要序列化對象的緣由

      • RDD 中的數據對象須要在 Mapper 端落盤緩存, 等待拉取

      • Mapper 和 Reducer 要傳輸數據對象

  • Spark Streaming 的 Receiver

    20190627200730
    • Spark Streaming 中獲取數據的組件叫作 Receiver, 獲取到的數據也是對象形式, 在獲取到之後須要落盤暫存, 就須要對數據對象進行序列化

  • 算子引用外部對象

    class Unserializable(i: Int)
    
    rdd.map(i => new Unserializable(i))
       .collect
       .foreach(println)
    • 在 Map 算子的函數中, 傳入了一個 Unserializable 的對象

    • Map 算子的函數是會在整個集羣中運行的, 那 Unserializable 對象就須要跟隨 Map 算子的函數被傳輸到不一樣的節點上

    • 若是 Unserializable 不能被序列化, 則會報錯

Step 3:  RDD 的序列化
20190627202022
RDD 的序列化

RDD 的序列化只能使用 Java 序列化器, 或者 Kryo 序列化器

爲何?
  • RDD 中存放的是數據對象, 要保留全部的數據就必需要對對象的元信息進行保存, 例如對象頭之類的

  • 保存一整個對象, 內存佔用和效率會比較低一些

Kryo 是什麼
  • Kryo 是 Spark 引入的一個外部的序列化工具, 能夠增快 RDD 的運行速度

  • 由於 Kryo 序列化後的對象更小, 序列化和反序列化的速度很是快

  • 在 RDD 中使用 Kryo 的過程以下

    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("KyroTest")
    
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[Person]))
    
    val sc = new SparkContext(conf)
    
    rdd.map(arr => Person(arr(0), arr(1), arr(2)))
Step 4:  DataFrame 和  Dataset 中的序列化
歷史的問題

RDD 中沒法感知數據的組成, 沒法感知數據結構, 只能以對象的形式處理數據

DataFrame 和  Dataset 的特色
  • DataFrame 和 Dataset 是爲結構化數據優化的

  • 在 DataFrame 和 Dataset 中, 數據和數據的 Schema 是分開存儲的

    spark.read
         .csv("...")
         .where($"name" =!= "")
         .groupBy($"name")
         .map(row: Row => row)
         .show()
  • DataFrame 中沒有數據對象這個概念, 全部的數據都以行的形式存在於 Row 對象中, Row 中記錄了每行數據的結構, 包括列名, 類型等

    20190627214134
  • Dataset 中上層能夠提供有類型的 API, 用以操做數據, 可是在內部, 不管是什麼類型的數據對象 Dataset 都使用一個叫作 InternalRow 的類型的對象存儲數據

    val dataset: Dataset[Person] = spark.read.csv(...).as[Person]
優化點 1: 元信息獨立
  1. RDD 不保存數據的元信息, 因此只能使用 Java Serializer 或者 Kyro Serializer 保存 整個對象

  2. DataFrame 和 Dataset 中保存了數據的元信息, 因此能夠把元信息獨立出來分開保存

    20190627233424
  3. 一個 DataFrame 或者一個 Dataset 中, 元信息只須要保存一份, 序列化的時候, 元信息不須要參與

    20190627233851
  4. 在反序列化 ( InternalRow → Object ) 時加入 Schema 信息便可

    20190627234337

元信息再也不參與序列化, 意味着數據存儲量的減小, 和效率的增長

優化點 2: 使用堆外內存
  • DataFrame 和 Dataset 再也不序列化元信息, 因此內存使用大大減小. 同時新的序列化方式還將數據存入堆外內存中, 從而避免 GC 的開銷.

  • 堆外內存又叫作 Unsafe, 之因此叫不安全的, 由於不能使用 Java 的垃圾回收機制, 須要本身負責對象的建立和回收, 性能很好, 可是不建議普通開發者使用, 畢竟不安全

總結
  1. 當須要將對象緩存下來的時候, 或者在網絡中傳輸的時候, 要把對象轉成二進制, 在使用的時候再將二進制轉爲對象, 這個過程叫作序列化和反序列化

  2. 在 Spark 中有不少場景須要存儲對象, 或者在網絡中傳輸對象

    1. Task 分發的時候, 須要將任務序列化, 分發到不一樣的 Executor 中執行

    2. 緩存 RDD 的時候, 須要保存 RDD 中的數據

    3. 廣播變量的時候, 須要將變量序列化, 在集羣中廣播

    4. RDD 的 Shuffle 過程當中 Map 和 Reducer 之間須要交換數據

    5. 算子中若是引入了外部的變量, 這個外部的變量也須要被序列化

  3. RDD 由於不保留數據的元信息, 因此必需要序列化整個對象, 常見的方式是 Java 的序列化器, 和 Kyro 序列化器

  4. Dataset 和 DataFrame 中保留數據的元信息, 因此能夠再也不使用 Java 的序列化器和 Kyro 序列化器, 使用 Spark 特有的序列化協議, 生成 UnsafeInternalRow 用以保存數據, 這樣不只能減小數據量, 也能減小序列化和反序列化的開銷, 其速度大概能達到 RDD 的序列化的 20 倍左右

1.3. Spark Streaming 和 Structured Streaming

目標和過程
目標

理解 Spark Streaming 和 Structured Streaming 之間的區別, 是很是必要的, 從這點上能夠理解 Structured Streaming 的過去和產生契機

過程
  1. Spark Streaming 時代

  2. Structured Streaming 時代

  3. Spark Streaming 和 Structured Streaming

Spark Streaming 時代
20190628010204
  • Spark Streaming 其實就是 RDD 的 API 的流式工具, 其本質仍是 RDD, 存儲和執行過程依然相似 RDD

Structured Streaming 時代
20190628010542
  • Structured Streaming 其實就是 Dataset 的 API 的流式工具, API 和 Dataset 保持高度一致

Spark Streaming 和  Structured Streaming
  • Structured Streaming 相比於 Spark Streaming 的進步就相似於 Dataset 相比於 RDD 的進步

  • 另外還有一點, Structured Streaming 已經支持了連續流模型, 也就是相似於 Flink 那樣的實時流, 而不是小批量, 但在使用的時候仍然有限制, 大部分狀況仍是應該採用小批量模式

在 2.2.0 之後 Structured Streaming 被標註爲穩定版本, 意味着之後的 Spark 流式開發不該該在採用 Spark Streaming 了

2. Structured Streaming 入門案例

目標

瞭解 Structured Streaming 的編程模型, 爲理解 Structured Streaming 時候是什麼, 以及核心體系原理打下基礎

步驟
  1. 需求梳理

  2. Structured Streaming 代碼實現

  3. 運行

  4. 驗證結果

2.1. 需求梳理

目標和過程
目標

理解接下來要作的案例, 有的放矢

步驟
  1. 需求

  2. 總體結構

  3. 開發方式

需求
20190628144128
  • 編寫一個流式計算的應用, 不斷的接收外部系統的消息

  • 對消息中的單詞進行詞頻統計

  • 統計全局的結果

總體結構
20190628131804
  1. Socket Server 等待 Structured Streaming 程序鏈接

  2. Structured Streaming 程序啓動, 鏈接 Socket Server, 等待 Socket Server 發送數據

  3. Socket Server 發送數據, Structured Streaming 程序接收數據

  4. Structured Streaming 程序接收到數據後處理數據

  5. 數據處理後, 生成對應的結果集, 在控制檯打印

開發方式和步驟

Socket server 使用 Netcat nc 來實現

Structured Streaming 程序使用 IDEA 實現, 在 IDEA 中本地運行

  1. 編寫代碼

  2. 啓動 nc 發送 Socket 消息

  3. 運行代碼接收 Socket 消息統計詞頻

總結
  • 簡單來講, 就是要進行流式的詞頻統計, 使用 Structured Streaming

2.2. 代碼實現

目標和過程
目標

實現 Structured Streaming 部分的代碼編寫

步驟
  1. 建立文件

  2. 建立 SparkSession

  3. 讀取 Socket 數據生成 DataFrame

  4. 將 DataFrame 轉爲 Dataset, 使用有類型的 API 處理詞頻統計

  5. 生成結果集, 並寫入控制檯

object SocketProcessor {

  def main(args: Array[String]): Unit = {

    // 1. 建立 SparkSession
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("socket_processor")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")    import spark.implicits._ // 2. 讀取外部數據源, 並轉爲 Dataset[String] val source = spark.readStream .format("socket") .option("host", "127.0.0.1") .option("port", 9999) .load() .as[String]  // 3. 統計詞頻 val words = source.flatMap(_.split(" ")) .map((_, 1)) .groupByKey(_._1) .count() // 4. 輸出結果 words.writeStream .outputMode(OutputMode.Complete())  .format("console")  .start()  .awaitTermination()  } }
  調整 Log 級別, 避免過多的 Log 影響視線
  默認 readStream 會返回 DataFrame, 可是詞頻統計更適合使用 Dataset 的有類型 API
  統計全局結果, 而不是一個批次
  將結果輸出到控制檯
  開始運行流式應用
  阻塞主線程, 在子線程中不斷獲取數據
總結
  • Structured Streaming 中的編程步驟依然是先讀, 後處理, 最後落地

  • Structured Streaming 中的編程模型依然是 DataFrame 和 Dataset

  • Structured Streaming 中依然是有外部數據源讀寫框架的, 叫作 readStream 和 writeStream

  • Structured Streaming 和 SparkSQL 幾乎沒有區別, 惟一的區別是, readStream 讀出來的是流, writeStream是將流輸出, 而 SparkSQL 中的批處理使用 read 和 write

2.3. 運行和結果驗證

目標和過程
目標

代碼已經編寫完畢, 須要運行, 並查看結果集, 由於從結果集的樣式中能夠看到 Structured Streaming 的一些原理

步驟
  1. 開啓 Socket server

  2. 運行程序

  3. 查看數據集

開啓  Socket server 和運行程序
  1. 在虛擬機 node01 中運行 nc -lk 9999

  2. 在 IDEA 中運行程序

  3. 在 node01 中輸入如下內容

    hello world
    hello spark
    hello hadoop
    hello spark
    hello spark
查看結果集
-------------------------------------------
Batch: 4
-------------------------------------------
+------+--------+
| value|count(1)|
+------+--------+
| hello|       5|
| spark|       3|
| world|       1|
|hadoop|       1|
+------+--------+

從結果集中能夠觀察到如下內容

  • Structured Streaming 依然是小批量的流處理

  • Structured Streaming 的輸出是相似 DataFrame 的, 也具備 Schema, 因此也是針對結構化數據進行優化的

  • 從輸出的時間特色上來看, 是一個批次先開始, 而後收集數據, 再進行展現, 這一點和 Spark Streaming 不太同樣

總結
  1. 運行的時候須要先開啓 Socket server

  2. Structured Streaming 的 API 和運行也是針對結構化數據進行優化過的

3. Stuctured Streaming 的體系和結構

目標

瞭解 Structured Streaming 的體系結構和核心原理, 有兩點好處, 一是須要了解原理纔好進行性能調優, 二是瞭解原理後, 才能理解代碼執行流程, 從而更好的記憶, 也作到知其然更知其因此然

步驟
  1. WordCount 的執行原理

  2. Structured Streaming 的體系結構

3.1. 無限擴展的表格

目標和過程
目標

Structured Streaming 是一個複雜的體系, 由不少組件組成, 這些組件之間也會進行交互, 若是沒法站在總體視角去觀察這些組件之間的關係, 也沒法理解 Structured Streaming 的全局

步驟
  1. 瞭解 Dataset 這個計算模型和流式計算的關係

  2. 如何使用 Dataset 處理流式數據?

  3. WordCount 案例的執行過程和原理

Dataset 和流式計算

能夠理解爲 Spark 中的 Dataset 有兩種, 一種是處理靜態批量數據的 Dataset, 一種是處理動態實時流的 Dataset, 這兩種 Dataset 之間的區別以下

  • 流式的 Dataset 使用 readStream 讀取外部數據源建立, 使用 writeStream 寫入外部存儲

  • 批式的 Dataset 使用 read 讀取外部數據源建立, 使用 write 寫入外部存儲

如何使用  Dataset 這個編程模型表示流式計算?
20190628191649
  • 能夠把流式的數據想象成一個不斷增加, 無限無界的表

  • 不管是否有界, 全都使用 Dataset 這一套 API

  • 經過這樣的作法, 就能徹底保證流和批的處理使用徹底相同的代碼, 減小這兩種處理方式的差別

WordCount 的原理
20190628232818
  • 整個計算過程大體上分爲以下三個部分

    1. Source, 讀取數據源

    2. Query, 在流式數據上的查詢

    3. Result, 結果集生成

  • 整個的過程以下

    1. 隨着時間段的流動, 對外部數據進行批次的劃分

    2. 在邏輯上, 將緩存全部的數據, 生成一張無限擴展的表, 在這張表上進行查詢

    3. 根據要生成的結果類型, 來選擇是否生成基於整個數據集的結果

總結
20190628235321
  • Dataset 不只能夠表達流式數據的處理, 也能夠表達批量數據的處理

  • Dataset 之因此能夠表達流式數據的處理, 由於 Dataset 能夠模擬一張無限擴展的表, 外部的數據會不斷的流入到其中

3.2. 體系結構

目標和過程
目標

Structured Streaming 是一個複雜的體系, 由不少組件組成, 這些組件之間也會進行交互, 若是沒法站在總體視角去觀察這些組件之間的關係, 也沒法理解 Structured Streaming 的核心原理

步驟
  1. 體系結構

  2. StreamExecution 的執行順序

體系結構
  • 在 Structured Streaming 中負責總體流程和執行的驅動引擎叫作 StreamExecution

    20190629111018

    StreamExecution 在流上進行基於 Dataset 的查詢, 也就是說, Dataset 之因此可以在流上進行查詢, 是由於 StreamExecution 的調度和管理

  • StreamExecution 如何工做?

    20190629100439

    StreamExecution 分爲三個重要的部分

    • Source, 從外部數據源讀取數據

    • LogicalPlan, 邏輯計劃, 在流上的查詢計劃

    • Sink, 對接外部系統, 寫入結果

StreamExecution 的執行順序
20190629113627
  1. 根據進度標記, 從 Source 獲取到一個由 DataFrame 表示的批次, 這個 DataFrame 表示數據的源頭

    val source = spark.readStream
      .format("socket")
      .option("host", "127.0.0.1")
      .option("port", 9999)
      .load()
      .as[String]

    這一點很是相似 val df = spark.read.csv() 所生成的 DataFrame, 一樣都是表示源頭

  2. 根據源頭 DataFrame 生成邏輯計劃

    val words = source.flatMap(_.split(" "))
      .map((_, 1))
      .groupByKey(_._1)
      .count()

    上述代碼表示的就是數據的查詢, 這一個步驟將這樣的查詢步驟生成爲邏輯執行計劃

  3. 優化邏輯計劃最終生成物理計劃

    67b14d92b21b191914800c384cbed439

    這一步其實就是使用 Catalyst 對執行計劃進行優化, 經歷基於規則的優化和基於成本模型的優化

  4. 執行物理計劃將表示執行結果的 DataFrame / Dataset 交給 Sink

    整個物理執行計劃會針對每個批次的數據進行處理, 處理後每個批次都會生成一個表示結果的 Dataset

    Sink 能夠將每個批次的結果 Dataset 落地到外部數據源

  5. 執行完畢後, 彙報 Source 這個批次已經處理結束, Source 提交併記錄最新的進度

增量查詢
  • 核心問題

    20190628232818

    上圖中清晰的展現了最終的結果生成是全局的結果, 而不是一個批次的結果, 可是從 StreamExecution 中能夠看到, 針對流的處理是按照一個批次一個批次來處理的

    那麼, 最終是如何生成全局的結果集呢?

  • 狀態記錄

    20190629115459

    在 Structured Streaming 中有一個全局範圍的高可用 StateStore, 這個時候針對增量的查詢變爲以下步驟

    1. 從 StateStore 中取出上次執行完成後的狀態

    2. 把上次執行的結果加入本批次, 再進行計算, 得出全局結果

    3. 將當前批次的結果放入 StateStore 中, 留待下次使用

    20190629123847
總結
  • StreamExecution 是整個 Structured Streaming 的核心, 負責在流上的查詢

  • StreamExecution 中三個重要的組成部分, 分別是 Source 負責讀取每一個批量的數據, Sink 負責將結果寫入外部數據源, Logical Plan 負責針對每一個小批量生成執行計劃

  • StreamExecution 中使用 StateStore 來進行狀態的維護

4. Source

目標和過程
目標

流式計算通常就是經過數據源讀取數據, 通過一系列處理再落地到某個地方, 因此這一小節先了解一下如何讀取數據, 能夠整合哪些數據源

過程
  1. 從 HDFS 中讀取數據

  2. 從 Kafka 中讀取數據

4.1. 從 HDFS 中讀取數據

目標和過程
目標
  • 在數據處理的時候, 常常會遇到這樣的場景

    20190630160310
  • 有時候也會遇到這樣的場景

    20190630160448
  • 以上兩種場景有兩個共同的特色

    • 會產生大量小文件在 HDFS 上

    • 數據須要處理

  • 經過本章節的學習, 便可以更深入的理解這種結構, 具備使用 Structured Streaming 整合 HDFS, 從其中讀取數據的能力

步驟
  1. 案例結構

  2. 產生小文件並推送到 HDFS

  3. 流式計算統計 HDFS 上的小文件

  4. 運行和總結

4.1.1. 案例結構

目標和步驟
目標

經過本章節能夠了解案例的過程和步驟, 以及案例的核心意圖

步驟
  1. 案例結構

  2. 實現步驟

  3. 難點和易錯點

案例流程
20190715111534
  1. 編寫 Python 小程序, 在某個目錄生成大量小文件

    • Python 是解釋型語言, 其程序能夠直接使用命令運行無需編譯, 因此適合編寫快速使用的程序, 不少時候也使用 Python 代替 Shell

    • 使用 Python 程序建立新的文件, 而且固定的生成一段 JSON 文本寫入文件

    • 在真實的環境中, 數據也是同樣的不斷產生而且被放入 HDFS 中, 可是在真實場景下, 多是 Flume 把小文件不斷上傳到 HDFS 中, 也多是 Sqoop 增量更新不斷在某個目錄中上傳小文件

  2. 使用 Structured Streaming 彙總數據

    • HDFS 中的數據是不斷的產生的, 因此也是流式的數據

    • 數據集是 JSON 格式, 要有解析 JSON 的能力

    • 由於數據是重複的, 要對全局的流數據進行彙總和去重, 其實真實場景下的數據清洗大部分狀況下也是要去重的

  3. 使用控制檯展現數據

    • 最終的數據結果以表的形式呈現

    • 使用控制檯展現數據意味着不須要在修改展現數據的代碼, 將 Sink 部分的內容放在下一個大章節去說明

    • 真實的工做中, 可能數據是要落地到 MySQLHBaseHDFS 這樣的存儲系統中

實現步驟
  • Step 1: 編寫 Python 腳本不斷的產生數據

    1. 使用 Python 建立字符串保存文件中要保存的數據

    2. 建立文件並寫入文件內容

    3. 使用 Python 調用系統 HDFS 命令上傳文件

  • Step 2: 編寫 Structured Streaming 程序處理數據

    1. 建立 SparkSession

    2. 使用 SparkSession 的 readStream 讀取數據源

    3. 使用 Dataset 操做數據, 只須要去重

    4. 使用 Dataset 的 writeStream 設置 Sink 將數據展現在控制檯中

  • Step 3: 部署程序, 驗證結果

    1. 上傳腳本到服務器中, 使用 python 命令運行腳本

    2. 開啓流計算應用, 讀取 HDFS 中對應目錄的數據

    3. 查看運行結果

難點和易錯點
  1. 在讀取 HDFS 的文件時, Source 不只對接數據源, 也負責反序列化數據源中傳過來的數據

    • Source 能夠從不一樣的數據源中讀取數據, 如 KafkaHDFS

    • 數據源可能會傳過來不一樣的數據格式, 如 JSONParquet

  2. 讀取 HDFS 文件的這個 Source 叫作 FileStreamSource

    從命名就能夠看出來這個 Source 不只支持 HDFS, 還支持本地文件讀取, 亞馬遜雲, 阿里雲 等文件系統的讀取, 例如: file://s3://oss://

  3. 基於流的 Dataset 操做和基於靜態數據集的 Dataset 操做是一致的

總結

整個案例運行的邏輯是

  1. Python 程序產生數據到 HDFS 中

  2. Structured Streaming 從 HDFS 中獲取數據

  3. Structured Streaming 處理數據

  4. 將數據展現在控制檯

整個案例的編寫步驟

  1. Python 程序

  2. Structured Streaming 程序

  3. 運行

4.1.2. 產生小文件並推送到 HDFS

目標和步驟
目標

經過本章節看到 Python 的大體語法, 並瞭解 Python 如何編寫腳本完成文件的操做, 其實不一樣的語言使用起來並無那麼難, 完成一些簡單的任務仍是很簡單的

步驟
  1. 建立 Python 代碼文件

  2. 編寫代碼

  3. 本地測試, 可是由於本地環境搭建比較浪費你們時間, 因此暫時再也不本地測試

代碼編寫
  • 隨便在任一目錄中建立文件 gen_files.py, 編寫如下內容

import os for index in range(100): content = """ {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} """ file_name = "/export/dataset/text{0}.json".format(index) with open(file_name, "w") as file:  file.write(content) os.system("/export/servers/hadoop/bin/hdfs dfs -mkdir -p /dataset/dataset/") os.system("/export/servers/hadoop/bin/hdfs dfs -put {0} /dataset/dataset/".format(file_name))
  建立文件, 使用這樣的寫法是由於 with 是一種 Python 的特殊語法, 若是使用 with 去建立文件的話, 使用結束後會自動關閉流
總結
  • Python 的語法靈活而乾淨, 比較易於編寫

  • 對於其它的語言能夠玩樂性質的去使用, 其實並無很難

4.1.3. 流式計算統計 HDFS 上的小文件

目標和步驟
目標

經過本章節的學習, 你們能夠了解到如何使用 Structured Streaming 讀取 HDFS 中的文件, 並以 JSON 的形式解析

步驟
  1. 建立文件

  2. 編寫代碼

代碼
val spark = SparkSession.builder()
  .appName("hdfs_source")
  .master("local[6]")
  .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

val userSchema = new StructType()
  .add("name", "string")
  .add("age", "integer")

val source = spark
  .readStream
  .schema(userSchema)
  .json("hdfs://node01:8020/dataset/dataset")

val result = source.distinct()

result.writeStream
  .outputMode(OutputMode.Update())
  .format("console")
  .start()
  .awaitTermination()
總結
  • 以流的形式讀取某個 HDFS 目錄的代碼爲

    val source = spark
      .readStream          .schema(userSchema)  .json("hdfs://node01:8020/dataset/dataset") 
      指明讀取的是一個流式的 Dataset
      指定讀取到的數據的 Schema
      指定目錄位置, 以及數據格式

4.1.4. 運行和流程總結

目標和步驟
目標

經過這個小節對案例的部署之後, 不只你們能夠學到一種常見的部署方式, 同時也能對案例的執行流程和流計算有更深刻的瞭解

步驟
  1. 運行 Python 程序

  2. 運行 Spark 程序

  3. 總結

運行 Python 程序
  1. 上傳 Python 源碼文件到服務器中

  2. 運行 Python 腳本

    # 進入 Python 文件被上傳的位置 cd ~  # 建立放置生成文件的目錄 mkdir -p /export/dataset  # 運行程序 python gen_files.py
運行 Spark 程序
  1. 使用 Maven 打包

    20190716000942
  2. 上傳至服務器

  3. 運行 Spark 程序

    # 進入保存 Jar 包的文件夾
    cd ~
    
    # 運行流程序
    spark-submit --class cn.itcast.structured.HDFSSource ./original-streaming-0.0.1.jar
總結
20190715111534
  1. Python 生成文件到 HDFS, 這一步在真實環境下, 多是由 Flume 和 Sqoop 收集並上傳至 HDFS

  2. Structured Streaming 從 HDFS 中讀取數據並處理

  3. Structured Streaming 講結果表展現在控制檯

4.2. 從 Kafka 中讀取數據

目標和步驟
目標

經過本章節的學習, 即可以理解流式系統和隊列間的關係, 同時可以編寫代碼從 Kafka 以流的方式讀取數據

步驟
  1. Kafka 回顧

  2. Structured Streaming 整合 Kafka

  3. 讀取 JSON 格式的內容

  4. 讀取多個 Topic 的數據

4.2.1 Kafka 的場景和結構

目標和步驟
目標

經過這一個小節的學習, 你們能夠理解 Kfaka 在整個系統中的做用, 往後工做的話, 也必需要先站在更高層去理解系統的組成, 才能完成功能和代碼

步驟
  1. Kafka 的應用場景

  2. Kafka 的特色

  3. Topic 和 Partitions

Kafka 是一個 Pub / Sub 系統
  • Pub / Sub 是 Publisher / Subscriber 的簡寫, 中文稱做爲發佈訂閱系統

    20190717102628
  • 發佈訂閱系統能夠有多個 Publisher 對應一個 Subscriber, 例如多個系統都會產生日誌, 經過這樣的方式, 一個日誌處理器能夠簡單的獲取全部系統產生的日誌

    20190717103721
  • 發佈訂閱系統也能夠一個 Publisher 對應多個 Subscriber, 這樣就相似於廣播了, 例如經過這樣的方式能夠很是輕易的將一個訂單的請求分發給全部感興趣的系統, 減小耦合性

    20190717104041
  • 固然, 在大數據系統中, 這樣的消息系統每每能夠做爲整個數據平臺的入口, 左邊對接業務系統各個模塊, 右邊對接數據系統各個計算工具

    20190717104853
Kafka 的特色

Kafka 有一個很是重要的應用場景就是對接業務系統和數據系統, 做爲一個數據管道, 其須要流通的數據量驚人, 因此 Kafka若是要知足這種場景的話, 就必定具備如下兩個特色

  • 高吞吐量

  • 高可靠性

Topic 和 Partitions
  • 消息和事件常常是不一樣類型的, 例如用戶註冊是一種消息, 訂單建立也是一種消息

    20190717110142
  • Kafka 中使用 Topic 來組織不一樣類型的消息

    20190717110431
  • Kafka 中的 Topic 要承受很是大的吞吐量, 因此 Topic 應該是能夠分片的, 應該是分佈式的

    20190717122114
總結
  • Kafka 的應用場景

    • 通常的系統中, 業務系統會不止一個, 數據系統也會比較複雜

    • 爲了減小業務系統和數據系統之間的耦合, 要將其分開, 使用一箇中間件來流轉數據

    • Kafka 由於其吞吐量超高, 因此適用於這種場景

  • Kafka 如何保證高吞吐量

    • 由於消息會有不少種類, Kafka 中能夠建立多個隊列, 每個隊列就是一個 Topic, 能夠理解爲是一個主題, 存放相關的消息

    • 由於 Topic 直接存放消息, 因此 Topic 必需要可以承受很是大的通量, 因此 Topic 是分佈式的, 是能夠分片的, 使用分佈式的並行處理能力來解決高通量的問題

4.2.2. Kafka 和 Structured Streaming 整合的結構

目標和步驟
目標

經過本小節能夠理解 Kafka 和 Structured Streaming 整合的結構原理, 同時還能理解 Spark 鏈接 Kafka 的時候一個很是重要的參數

步驟
  1. Topic 的 Offset

  2. Kafka 和 Structured Streaming 的整合結構

  3. Structured Streaming 讀取 Kafka 消息的三種方式

Topic 的 Offset
  • Topic 是分區的, 每個 Topic 的分區分佈在不一樣的 Broker 上

    20190717161413
  • 每一個分區都對應一系列的 Log 文件, 消息存在於 Log 中, 消息的 ID 就是這條消息在本分區的 Offset 偏移量

    20190717162840
 

Offset 又稱做爲偏移量, 其實就是一個東西距離另一個東西的距離

20190717165649

Kafka 中使用 Offset 命名消息, 而不是指定 ID 的緣由是想表示永遠自增, ID 是能夠指定的, 可是 Offset 只能是一個距離值, 它只會愈來愈大, 因此, 叫作 Offset 而不叫 ID 也是這個考慮, 消息只能追加到 Log 末尾, 只能增加不能減小

Kafka 和 Structured Streaming 整合的結構
20190718022525
分析
  • Structured Streaming 中使用 Source 對接外部系統, 對接 Kafka 的 Source 叫作 KafkaSource

  • KafkaSource 中會使用 KafkaSourceRDD 來映射外部 Kafka 的 Topic, 二者的 Partition 一一對應

結論

Structured Streaming 會並行的從 Kafka 中獲取數據

Structured Streaming 讀取 Kafka 消息的三種方式
20190718023534
  • Earliest 從每一個 Kafka 分區最開始處開始獲取

  • Assign 手動指定每一個 Kafka 分區中的 Offset

  • Latest 再也不處理以前的消息, 只獲取流計算啓動後新產生的數據

總結
  • Kafka 中的消息存放在某個 Topic 的某個 Partition 中, 消息是不可變的, 只會在消息過時的時候從最先的消息開始刪除, 消息的 ID 也叫作 Offset, 而且只能正增加

  • Structured Streaming 整合 Kafka 的時候, 會並行的經過 Offset 從全部 Topic 的 Partition 中獲取數據

  • Structured Streaming 在從 Kafka 讀取數據的時候, 能夠選擇從最先的地方開始讀取, 也能夠選擇從任意位置讀取, 也能夠選擇只讀取最新的

4.2.3. 需求介紹

目標和步驟
目標

經過本章節的學習, 能夠掌握一個常見的需求, 而且瞭解後面案例的編寫步驟

步驟
  1. 需求

  2. 數據

需求
  1. 模擬一個智能物聯網系統的數據統計

    20190718151808
    • 有一個智能家居品牌叫作 Nest, 他們主要有兩款產品, 一個是恆溫器, 一個是攝像頭

    • 恆溫器的主要做用是經過感應器識別家裏何時有人, 攝像頭主要做用是經過學習算法來識別出如今攝像頭中的人是不是家裏人, 若是不是則報警

    • 因此這兩個設備都須要統計一個指標, 就是家裏何時有人, 此需求就是針對這個設備的一部分數據, 來統計家裏何時有人

  2. 使用生產者在 Kafka 的 Topic : streaming-test 中輸入 JSON 數據

    {
      "devices": { "cameras": { "device_id": "awJo6rH", "last_event": { "has_sound": true, "has_motion": true, "has_person": true, "start_time": "2016-12-29T00:00:00.000Z", "end_time": "2016-12-29T18:42:00.000Z" } } } }
  3. 使用 Structured Streaming 來過濾出來家裏有人的數據

    把數據轉換爲 時間 → 是否有人 這樣相似的形式

數據轉換
  1. 追蹤 JSON 數據的格式

    能夠在一個在線的工具 https://jsonformatter.org/ 中格式化 JSON, 會發現 JSON 格式以下

    20190720000717
  2. 反序列化

    JSON 數據本質上就是字符串, 只不過這個字符串是有結構的, 雖然有結構, 可是很難直接從字符串中取出某個值

    而反序列化, 就是指把 JSON 數據轉爲對象, 或者轉爲 DataFrame, 能夠直接使用某一個列或者某一個字段獲取數據, 更加方便

    而想要作到這件事, 必需要先根據數據格式, 編寫 Schema 對象, 從而經過一些方式轉爲 DataFrame

    val eventType = new StructType()
      .add("has_sound", BooleanType, nullable = true)
      .add("has_motion", BooleanType, nullable = true)
      .add("has_person", BooleanType, nullable = true)
      .add("start_time", DateType, nullable = true)
      .add("end_time", DateType, nullable = true)
    
    val camerasType = new StructType()
      .add("device_id", StringType, nullable = true)
      .add("last_event", eventType, nullable = true)
    
    val devicesType = new StructType()
      .add("cameras", camerasType, nullable = true)
    
    val schema = new StructType()
      .add("devices", devicesType, nullable = true)
總結
  1. 業務簡單來講, 就是收集智能家居設備的數據, 經過流計算的方式計算其特徵規律

  2. Kafka 常見的業務場景就是對接業務系統和數據系統

    1. 業務系統常常會使用 JSON 做爲數據傳輸格式

    2. 因此使用 Structured Streaming 來對接 Kafka 並反序列化 Kafka 中的 JSON 格式的消息, 是一個很是重要的技能

  3. 不管使用什麼方式, 若是想反序列化 JSON 數據, 就必需要先追蹤 JSON 數據的結構

4.2.4. 使用 Spark 流計算鏈接 Kafka 數據源

目標和步驟
目標

經過本章節的數據, 可以掌握如何使用 Structured Streaming 對接 Kafka, 從其中獲取數據

步驟
  1. 建立 Topic 並輸入數據到 Topic

  2. Spark 整合 kafka

  3. 讀取到的 DataFrame 的數據結構

建立 Topic 並輸入數據到 Topic
  1. 使用命令建立 Topic

    bin/kafka-topics.sh --create --topic shoppingStreaming --replication-factor 1 --partitions 3 --zookeeper node01:2181,node02:2181,node03:2181/kafka

  2. 開啓 Producer

    bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic shoppingStreaming
    測試:開啓consumer端: bin/kafka-console-consumer.sh --from-beginning --topicshoppingStreaming--zookeeper node01:2181,node02:2181,node03:2181/kafka
  3. 把 JSON 轉爲單行輸入

    {"devices":{"cameras":{"device_id":"awJo6rH","last_event":{"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}
使用 Spark 讀取 Kafka 的 Topic
  1. 編寫 Spark 代碼讀取 Kafka Topic

    val source = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node01:9092,node01:9092,node03:9092")
      .option("subscribe", "streaming_test")
      .option("startingOffsets", "earliest")
      .load()
    • 三個參數

      • kafka.bootstrap.servers : 指定 Kafka 的 Server 地址

      • subscribe : 要監聽的 Topic, 能夠傳入多個, 傳入多個 Topic 則監聽多個 Topic, 也可使用 topic-* 這樣的通配符寫法

      • startingOffsets : 從什麼位置開始獲取數據, 可選值有 earliestassignlatest

    • format 設置爲 Kafka 指定使用 KafkaSource 讀取數據

  2. 思考: 從 Kafka 中應該獲取到什麼?

    • 業務系統有不少種類型, 有多是 Web 程序, 有多是物聯網

      20190720132133

      前端大多數狀況下使用 JSON 作數據交互

    • 問題1: 業務系統如何把數據給 Kafka ?

      20190720134513

      能夠主動或者被動的把數據交給 Kafka, 可是不管使用什麼方式, 都在使用 Kafka 的 Client 類庫來完成這件事, Kafka 的類庫調用方式以下

      Producer<String, String> producer = new KafkaProducer<String, String>(properties); producer.send(new ProducerRecord<String, String>("HelloWorld", msg));

      其中發給 Kafka 的消息是 KV 類型的

    • 問題2: 使用 Structured Streaming 訪問 Kafka 獲取數據的時候, 須要什麼東西呢?

      • 需求1: 存儲當前處理過的 Kafka 的 Offset

      • 需求2: 對接多個 Kafka Topic 的時候, 要知道這條數據屬於哪一個 Topic

    • 結論

      • Kafka 中收到的消息是 KV 類型的, 有 Key, 有 Value

      • Structured Streaming 對接 Kafka 的時候, 每一條 Kafka 消息不能只是 KV, 必需要有 TopicPartition 之類的信息

  3. 從 Kafka 獲取的 DataFrame 格式

    source.printSchema()

    結果以下

    root
     |-- key: binary (nullable = true)
     |-- value: binary (nullable = true)
     |-- topic: string (nullable = true)
     |-- partition: integer (nullable = true)
     |-- offset: long (nullable = true)
     |-- timestamp: timestamp (nullable = true)
     |-- timestampType: integer (nullable = true)

    從 Kafka 中讀取到的並非直接是數據, 而是一個包含各類信息的表格, 其中每一個字段的含義以下

    Key 類型 解釋

    key

    binary

    Kafka 消息的 Key

    value

    binary

    Kafka 消息的 Value

    topic

    string

    本條消息所在的 Topic, 由於整合的時候一個 Dataset 能夠對接多個 Topic, 因此有這樣一個信息

    partition

    integer

    消息的分區號

    offset

    long

    消息在其分區的偏移量

    timestamp

    timestamp

    消息進入 Kafka 的時間戳

    timestampType

    integer

    時間戳類型

總結
  1. 必定要把 JSON 轉爲一行, 再使用 Producer 發送, 否則會出現獲取多行的狀況

  2. 使用 Structured Streaming 鏈接 Kafka 的時候, 須要配置以下三個參數

    • kafka.bootstrap.servers : 指定 Kafka 的 Server 地址

    • subscribe : 要監聽的 Topic, 能夠傳入多個, 傳入多個 Topic 則監聽多個 Topic, 也可使用 topic-* 這樣的通配符寫法

    • startingOffsets : 從什麼位置開始獲取數據, 可選值有 earliestassignlatest

  3. 從 Kafka 獲取到的 DataFrame 的 Schema 以下

    root
     |-- key: binary (nullable = true)
     |-- value: binary (nullable = true)
     |-- topic: string (nullable = true)
     |-- partition: integer (nullable = true)
     |-- offset: long (nullable = true)
     |-- timestamp: timestamp (nullable = true)
     |-- timestampType: integer (nullable = true)

4.2.5. JSON 解析和數據統計

目標和步驟
目標

經過本章的學習, 便可以解析 Kafka 中的 JSON 數據, 這是一個重點中的重點

步驟
  1. JSON 解析

  2. 數據處理

  3. 運行測試

JSON 解析
  1. 準備好 JSON 所在的列

    問題

    由 Dataset 的結構能夠知道 key 和 value 列的類型都是 binary 二進制, 因此要將其轉爲字符串, 纔可進行 JSON 解析

    解決方式
    source.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
  2. 編寫 Schema 對照 JSON 的格式

    • Key 要對應 JSON 中的 Key

    • Value 的類型也要對應 JSON 中的 Value 類型

    val eventType = new StructType()
      .add("has_sound", BooleanType, nullable = true)
      .add("has_motion", BooleanType, nullable = true)
      .add("has_person", BooleanType, nullable = true)
      .add("start_time", DateType, nullable = true)
      .add("end_time", DateType, nullable = true)
    
    val camerasType = new StructType()
      .add("device_id", StringType, nullable = true)
      .add("last_event", eventType, nullable = true)
    
    val devicesType = new StructType()
      .add("cameras", camerasType, nullable = true)
    
    val schema = new StructType()
      .add("devices", devicesType, nullable = true)
  3. 由於 JSON 中包含 Date 類型的數據, 因此要指定時間格式化方式

    val jsonOptions = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
  4. 使用 from_json 這個 UDF 格式化 JSON

    .select(from_json('value, schema, jsonOptions).alias("parsed_value"))
  5. 選擇格式化事後的 JSON 中的字段

    由於 JSON 被格式化事後, 已經變爲了 StructType, 因此能夠直接獲取其中某些字段的值

    .selectExpr("parsed_value.devices.cameras.last_event.has_person as has_person",
              "parsed_value.devices.cameras.last_event.start_time as start_time")
數據處理
  1. 統計各個時段有人的數據

    .filter('has_person === true)
    .groupBy('has_person, 'start_time)
    .count()
  2. 將數據落地到控制檯

    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()
所有代碼
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-test")
  .option("startingOffsets", "earliest")
  .load()

val eventType = new StructType()
  .add("has_sound", BooleanType, nullable = true)
  .add("has_motion", BooleanType, nullable = true)
  .add("has_person", BooleanType, nullable = true)
  .add("start_time", DateType, nullable = true)
  .add("end_time", DateType, nullable = true)

val camerasType = new StructType()
  .add("device_id", StringType, nullable = true)
  .add("last_event", eventType, nullable = true)

val devicesType = new StructType()
  .add("cameras", camerasType, nullable = true)

val schema = new StructType()
  .add("devices", devicesType, nullable = true)

val jsonOptions = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.sss'Z'")

import org.apache.spark.sql.functions._
import spark.implicits._

val result = source.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
    .select(from_json('value, schema, jsonOptions).alias("parsed_value"))
    .selectExpr("parsed_value.devices.cameras.last_event.has_person as has_person",
      "parsed_value.devices.cameras.last_event.start_time as start_time")
    .filter('has_person === true)
    .groupBy('has_person, 'start_time)
    .count()

result.writeStream
  .outputMode(OutputMode.Complete())
  .format("console")
  .start()
  .awaitTermination()
運行測試
  1. 進入服務器中, 啓動 Kafka

  2. 啓動 Kafka 的 Producer

    bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic streaming-test
  3. 啓動 Spark shell 並拷貝代碼進行測試

    ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
    • 由於須要和 Kafka 整合, 因此在啓動的時候須要加載和 Kafka 整合的包 spark-sql-kafka-0-10

5. Sink

目標和步驟
目標
  • 可以串聯兩端, 理解整個流式應用, 以及其中的一些根本的原理, 好比說容錯語義

  • 可以知道如何對接外部系統, 寫入數據

步驟
  1. HDFS Sink

  2. Kafka Sink

  3. Foreach Sink

  4. 自定義 Sink

  5. Tiggers

  6. Sink 原理

  7. 錯誤恢復和容錯語義

5.1. HDFS Sink

目標和步驟
目標

可以使用 Spark 將流式數據的處理結果放入 HDFS

步驟
  1. 場景和需求

  2. 代碼實現

場景和需求
場景
  • Kafka 每每做爲數據系統和業務系統之間的橋樑

  • 數據系統通常由批量處理和流式處理兩個部分組成

  • 在 Kafka 做爲整個數據平臺入口的場景下, 須要使用 StructuredStreaming 接收 Kafka 的數據並放置於 HDFS 上, 後續才能夠進行批量處理

20190808023517
案例需求
  • 從 Kafka 接收數據, 從給定的數據集中, 裁剪部分列, 落地於 HDFS

代碼實現
步驟說明
  1. 從 Kafka 讀取數據, 生成源數據集

    1. 鏈接 Kafka 生成 DataFrame

    2. 從 DataFrame 中取出表示 Kafka 消息內容的 value 列並轉爲 String 類型

  2. 對源數據集選擇列

    1. 解析 CSV 格式的數據

    2. 生成正確類型的結果集

  3. 落地 HDFS

總體代碼
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import spark.implicits._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-bank")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

val result = source.map {
  item =>
    val arr = item.replace("\"", "").split(";")
    (arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")

result.writeStream
  .format("parquet") // 也能夠是 "orc", "json", "csv" 等
  .option("path", "/dataset/streaming/result/")
  .start()

5.2. Kafka Sink

目標和步驟
目標

掌握何時要將流式數據落地至 Kafka, 以及如何落地至 Kafka

步驟
  1. 場景

  2. 代碼

場景
場景
  • 有不少時候, ETL 事後的數據, 須要再次放入 Kafka

  • 在 Kafka 後, 可能會有流式程序統一將數據落地到 HDFS 或者 HBase

20190809014210
案例需求
  • 從 Kafka 中獲取數據, 簡單處理, 再次放入 Kafka

代碼
步驟
  1. 從 Kafka 讀取數據, 生成源數據集

    1. 鏈接 Kafka 生成 DataFrame

    2. 從 DataFrame 中取出表示 Kafka 消息內容的 value 列並轉爲 String 類型

  2. 對源數據集選擇列

    1. 解析 CSV 格式的數據

    2. 生成正確類型的結果集

  3. 再次落地 Kafka

代碼
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import spark.implicits._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-bank")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

val result = source.map {
  item =>
    val arr = item.replace("\"", "").split(";")
    (arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")

result.writeStream
  .format("kafka")
  .outputMode(OutputMode.Append())
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("topic", "streaming-bank-result")
  .start()
  .awaitTermination()

5.3. Foreach Writer

目標和步驟
目標

掌握 Foreach 模式理解如何擴展 Structured Streaming 的 Sink, 同時可以將數據落地到 MySQL

步驟
  1. 需求

  2. 代碼

需求
  • 場景

    • 大數據有一個常見的應用場景

      1. 收集業務系統數據

      2. 數據處理

      3. 放入 OLTP 數據

      4. 外部經過 ECharts 獲取並處理數據

    • 這個場景下, StructuredStreaming 就須要處理數據並放入 MySQL 或者 MongoDBHBase 中以供 Web 程序能夠獲取數據, 圖表的形式展現在前端

    20190809115742
  • Foreach 模式::

    • 原由

      • 在 Structured Streaming 中, 並未提供完整的 MySQL/JDBC 整合工具

      • 不止 MySQL 和 JDBC, 可能會有其它的目標端須要寫入

      • 不少時候 Structured Streaming 須要對接一些第三方的系統, 例如阿里雲的雲存儲, 亞馬遜雲的雲存儲等, 可是 Spark 沒法對全部第三方都提供支持, 有時候須要本身編寫

    • 解決方案

      20190809122425
      • 既然沒法知足全部的整合需求, StructuredStreaming 提供了 Foreach, 能夠拿到每個批次的數據

      • 經過 Foreach 拿到數據後, 能夠經過自定義寫入方式, 從而將數據落地到其它的系統

  • 案例需求::

    20190809122804
    • 從 Kafka 中獲取數據, 處理後放入 MySQL

代碼
步驟
  1. 建立 DataFrame 表示 Kafka 數據源

  2. 在源 DataFrame 中選擇三列數據

  3. 建立 ForeachWriter 接收每個批次的數據落地 MySQL

  4. Foreach 落地數據

代碼
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import spark.implicits._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-bank")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

val result = source.map {
  item =>
    val arr = item.replace("\"", "").split(";")
    (arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")

class MySQLWriter extends ForeachWriter[Row] {
  val driver = "com.mysql.jdbc.Driver"
  var statement: Statement = _
  var connection: Connection  = _
  val url: String = "jdbc:mysql://node01:3306/streaming-bank-result"
  val user: String = "root"
  val pwd: String = "root"

  override def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = DriverManager.getConnection(url, user, pwd)
    this.statement = connection.createStatement
    true
  }

  override def process(value: Row): Unit = {
    statement.executeUpdate(s"insert into bank values(" +
      s"${value.getAs[Int]("age")}, " +
      s"${value.getAs[Int]("job")}, " +
      s"${value.getAs[Int]("balance")} )")
  }

  override def close(errorOrNull: Throwable): Unit = {
    connection.close()
  }
}

result.writeStream
  .foreach(new MySQLWriter)
  .start()
  .awaitTermination()

5.4. 自定義 Sink

目標和步驟
目標
  • Foreach 傾向於一次處理一條數據, 若是想拿到 DataFrame 冪等的插入外部數據源, 則須要自定義 Sink

  • 瞭解如何自定義 Sink

步驟
  1. Spark 加載 Sink 流程分析

  2. 自定義 Sink

Spark 加載 Sink 流程分析
  • Sink 加載流程

    1. writeStream 方法中會建立一個 DataStreamWriter 對象

      def writeStream: DataStreamWriter[T] = {
        if (!isStreaming) {
          logicalPlan.failAnalysis(
            "'writeStream' can be called only on streaming Dataset/DataFrame")
        }
        new DataStreamWriter[T](this)
      }
    2. 在 DataStreamWriter 對象上經過 format 方法指定 Sink 的短名並記錄下來

      def format(source: String): DataStreamWriter[T] = {
        this.source = source
        this
      }
    3. 最終會經過 DataStreamWriter 對象上的 start 方法啓動執行, 其中會經過短名建立 DataSource

      val dataSource =
          DataSource(
            df.sparkSession,
            className = source,  options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil))
        傳入的 Sink 短名
    4. 在建立 DataSource 的時候, 會經過一個複雜的流程建立出對應的 Source 和 Sink

      lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
    5. 在這個複雜的建立流程中, 有一行最關鍵的代碼, 就是經過 Java 的類加載器加載全部的 DataSourceRegister

      val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
    6. 在 DataSourceRegister 中會建立對應的 Source 或者 Sink

      trait DataSourceRegister {
      
        def shortName(): String       } trait StreamSourceProvider { def createSource(  sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source } trait StreamSinkProvider { def createSink(  sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink }
        提供短名
        建立 Source
        建立 Sink
  • 自定義 Sink 的方式

    • 根據前面的流程說明, 有兩點很是重要

      • Spark 會自動加載全部 DataSourceRegister 的子類, 因此須要經過 DataSourceRegister 加載 Source 和 Sink

      • Spark 提供了 StreamSinkProvider 用以建立 Sink, 提供必要的依賴

    • 因此若是要建立自定義的 Sink, 須要作兩件事

      1. 建立一個註冊器, 繼承 DataSourceRegister 提供註冊功能, 繼承 StreamSinkProvider 獲取建立 Sink的必備依賴

      2. 建立一個 Sink 子類

自定義 Sink
步驟
  1. 讀取 Kafka 數據

  2. 簡單處理數據

  3. 建立 Sink

  4. 建立 Sink 註冊器

  5. 使用自定義 Sink

代碼
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[6]")
  .appName("kafka integration")
  .getOrCreate()

import spark.implicits._

val source = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  .option("subscribe", "streaming-bank")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

val result = source.map {
  item =>
    val arr = item.replace("\"", "").split(";")
    (arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
  .as[(Int, Int, Int)]
  .toDF("age", "job", "balance")

class MySQLSink(options: Map[String, String], outputMode: OutputMode) extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val userName = options.get("userName").orNull
    val password = options.get("password").orNull
    val table = options.get("table").orNull
    val jdbcUrl = options.get("jdbcUrl").orNull

    val properties = new Properties
    properties.setProperty("user", userName)
    properties.setProperty("password", password)

    data.write.mode(outputMode.toString).jdbc(jdbcUrl, table, properties)
  }
}

class MySQLStreamSinkProvider extends StreamSinkProvider with DataSourceRegister {

  override def createSink(sqlContext: SQLContext,
                          parameters: Map[String, String],
                          partitionColumns: Seq[String],
                          outputMode: OutputMode): Sink = {
    new MySQLSink(parameters, outputMode)
  }

  override def shortName(): String = "mysql"
}

result.writeStream
  .format("mysql")
  .option("username", "root")
  .option("password", "root")
  .option("table", "streaming-bank-result")
  .option("jdbcUrl", "jdbc:mysql://node01:3306/test")
  .start()
  .awaitTermination()

5.5. Tigger

目標和步驟
目標

掌握如何控制 StructuredStreaming 的處理時間

步驟
  1. 微批次處理

  2. 連續流處理

微批次處理
  • 什麼是微批次

    20190628144128
    • 並非真正的流, 而是緩存一個批次週期的數據, 後處理這一批次的數據

  • 通用流程

    步驟
    1. 根據 Spark 提供的調試用的數據源 Rate 建立流式 DataFrame

      • Rate 數據源會按期提供一個由兩列 timestamp, value 組成的數據, value 是一個隨機數

    2. 處理和聚合數據, 計算每一個個位數和十位數各有多少條數據

      • 對 value 求 log10 便可得出其位數

      • 後按照位數進行分組, 最終就能夠看到每一個位數的數據有多少個

    代碼
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("socket_processor")
      .getOrCreate()
    
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    spark.sparkContext.setLogLevel("ERROR")
    
    val source = spark.readStream
      .format("rate")
      .load()
    
    val result = source.select(log10('value) cast IntegerType as 'key, 'value)
        .groupBy('key)
        .agg(count('key) as 'count)
        .select('key, 'count)
        .where('key.isNotNull)
        .sort('key.asc)
  • 默認方式劃分批次

    介紹

    默認狀況下的 Structured Streaming 程序會運行在微批次的模式下, 當一個批次結束後, 下一個批次會當即開始處理

    步驟
    1. 指定落地到 Console 中, 不指定 Trigger

    代碼
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .start()
      .awaitTermination()
  • 按照固定時間間隔劃分批次

    介紹

    使用微批次處理數據, 使用用戶指定的時間間隔啓動批次, 若是間隔指定爲 0, 則儘量快的去處理, 一個批次緊接着一個批次

    • 若是前一批數據提早完成, 待到批次間隔達成的時候再啓動下一個批次

    • 若是前一批數據延後完成, 下一個批次會在前面批次結束後當即啓動

    • 若是沒有數據可用, 則不啓動處理

    步驟
    1. 經過 Trigger.ProcessingTime() 指定處理間隔

    代碼
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start()
      .awaitTermination()
  • 一次性劃分批次

    介紹

    只劃分一個批次, 處理完成之後就中止 Spark 工做, 當須要啓動一下 Spark 處理遺留任務的時候, 處理完就關閉集羣的狀況下, 這個劃分方式很是實用

    步驟
    1. 使用 Trigger.Once 一次性劃分批次

    代碼
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .trigger(Trigger.Once())
      .start()
      .awaitTermination()
連續流處理
  • 介紹

    • 微批次會將收到的數據按照批次劃分爲不一樣的 DataFrame, 後執行 DataFrame, 因此其數據的處理延遲取決於每一個 DataFrame 的處理速度, 最快也只能在一個 DataFrame 結束後馬上執行下一個, 最快能夠達到 100ms 左右的端到端延遲

    • 而連續流處理能夠作到大約 1ms 的端到端數據處理延遲

    • 連續流處理能夠達到 at-least-once 的容錯語義

    • 從 Spark 2.3 版本開始支持連續流處理, 咱們所採用的 2.2 版本尚未這個特性, 而且這個特性截止到 2.4 依然是實驗性質, 不建議在生產環境中使用

  • 操做

    步驟
    1. 使用特殊的 Trigger 完成功能

    代碼
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .trigger(Trigger.Continuous("1 second"))
      .start()
      .awaitTermination()
  • 限制

    • 只支持 Map 類的有類型操做

    • 只支持普通的的 SQL 類操做, 不支持聚合

    • Source 只支持 Kafka

    • Sink 只支持 KafkaConsoleMemory

5.6. 從 Source 到 Sink 的流程

目標和步驟
目標

理解 Source 到 Sink 的總體原理

步驟
  1. 從 Source 到 Sink 的流程

從 Source 到 Sink 的流程
20190809184239
  1. 在每一個 StreamExecution 的批次最開始, StreamExecution 會向 Source 詢問當前 Source 的最新進度, 即最新的 offset

  2. StreamExecution 將 Offset 放到 WAL 裏

  3. StreamExecution 從 Source 獲取 start offsetend offset 區間內的數據

  4. StreamExecution 觸發計算邏輯 logicalPlan 的優化與編譯

  5. 計算結果寫出給 Sink

    • 調用 Sink.addBatch(batchId: Long, data: DataFrame) 完成

    • 此時纔會由 Sink 的寫入操做開始觸發實際的數據獲取和計算過程

  6. 在數據完整寫出到 Sink 後, StreamExecution 通知 Source 批次 id 寫入到 batchCommitLog, 當前批次結束

5.7. 錯誤恢復和容錯語義

目標和步驟
目標

理解 Structured Streaming 中提供的系統級別容錯手段

步驟
  1. 端到端

  2. 三種容錯語義

  3. Sink 的容錯

端到端
20190809190803
  • Source 多是 KafkaHDFS

  • Sink 也多是 KafkaHDFSMySQL 等存儲服務

  • 消息從 Source 取出, 通過 Structured Streaming 處理, 最後落地到 Sink 的過程, 叫作端到端

三種容錯語義
  • at-most-once

    20190809192258
    • 在數據從 Source 到 Sink 的過程當中, 出錯了, Sink 可能沒收到數據, 可是不會收到兩次, 叫作 at-most-once

    • 通常錯誤恢復的時候, 不重複計算, 則是 at-most-once

  • at-least-once

    20190809192258
    • 在數據從 Source 到 Sink 的過程當中, 出錯了, Sink 必定會收到數據, 可是可能收到兩次, 叫作 at-least-once

    • 通常錯誤恢復的時候, 重複計算可能完成也可能未完成的計算, 則是 at-least-once

  • exactly-once

    20190809192258
    • 在數據從 Source 到 Sink 的過程當中, 雖然出錯了, Sink 必定剛好收到應該收到的數據, 一條不重複也一條都很多, 便是 exactly-once

    • 想作到 exactly-once 是很是困難的

Sink 的容錯
20190809192644
  • 故障恢復通常分爲 Driver 的容錯和 Task 的容錯

    • Driver 的容錯指的是整個系統都掛掉了

    • Task 的容錯指的是一個任務沒運行明白, 從新運行一次

  • 由於 Spark 的 Executor 可以很是好的處理 Task 的容錯, 因此咱們主要討論 Driver 的容錯, 若是出錯的時候

    • 讀取 WAL offsetlog 恢復出最新的 offsets

      當 StreamExecution 找到 Source 獲取數據的時候, 會將數據的起始放在 WAL offsetlog 中, 當出錯要恢復的時候, 就能夠從中獲取當前處理批次的數據起始, 例如 Kafka 的 Offset

    • 讀取 batchCommitLog 決定是否須要重作最近一個批次

      當 Sink 處理完批次的數據寫入時, 會將當前的批次 ID 存入 batchCommitLog, 當出錯的時候就能夠從中取出進行到哪個批次了, 和 WAL 對比便可得知當前批次是否處理完

    • 若是有必要的話, 當前批次數據重作

      • 若是上次執行在 (5) 結束前即失效, 那麼本次執行裏 Sink 應該完整寫出計算結果

      • 若是上次執行在 (5) 結束後才失效, 那麼本次執行裏 Sink 能夠從新寫出計算結果 (覆蓋上次結果), 也能夠跳過寫出計算結果(由於上次執行已經完整寫出過計算結果了)

    • 這樣便可保證每次執行的計算結果, 在 Sink 這個層面, 是 不重不丟 的, 即便中間發生過失效和恢復, 因此 Structured Streaming 能夠作到 exactly-once

容錯所須要的存儲
  • 存儲

    • offsetlog 和 batchCommitLog 關乎於錯誤恢復

    • offsetlog 和 batchCommitLog 須要存儲在可靠的空間裏

    • offsetlog 和 batchCommitLog 存儲在 Checkpoint 中

    • WAL 其實也存在於 Checkpoint 中

  • 指定 Checkpoint

    • 只有指定了 Checkpoint 路徑的時候, 對應的容錯功能才能夠開啓

    aggDF
      .writeStream
      .outputMode("complete")
      .option("checkpointLocation", "path/to/HDFS/dir")  .format("memory") .start()
      指定 Checkpoint 的路徑, 這個路徑對應的目錄必須是 HDFS 兼容的文件系統
須要的外部支持

若是要作到 exactly-once, 只是 Structured Streaming 能作到還不行, 還須要 Source 和 Sink 系統的支持

  • Source 須要支持數據重放

    當有必要的時候, Structured Streaming 須要根據 start 和 end offset 從 Source 系統中再次獲取數據, 這叫作重放

  • Sink 須要支持冪等寫入

    若是須要重作整個批次的時候, Sink 要支持給定的 ID 寫入數據, 這叫冪等寫入, 一個 ID 對應一條數據進行寫入, 若是前面已經寫入, 則替換或者丟棄, 不能重複

因此 Structured Streaming 想要作到 exactly-once, 則也須要外部系統的支持, 以下

Source

Sources

是否可重放

原生內置支持

註解

HDFS

能夠

已支持

包括但不限於 TextJSONCSVParquetORC

Kafka

能夠

已支持

Kafka 0.10.0+

RateStream

能夠

已支持

以必定速率產生數據

RDBMS

能夠

待支持

預計後續很快會支持

Socket

不能夠

已支持

主要用途是在技術會議和講座上作 Demo

Sink

Sinks

是否冪等寫入

原生內置支持

註解

HDFS

能夠

支持

包括但不限於 TextJSONCSVParquetORC

ForeachSink

能夠

支持

可定製度很是高的 Sink, 是否能夠冪等取決於具體的實現

RDBMS

能夠

待支持

預計後續很快會支持

Kafka

不能夠

支持

Kafka 目前不支持冪等寫入, 因此可能會有重複寫入

6. 有狀態算子

目標和步驟
目標

瞭解常見的 Structured Streaming 算子, 可以完成常見的流式計算需求

步驟
  1. 常規算子

  2. 分組算子

  3. 輸出模式

狀態
  • 無狀態算子

    20190814171907
    • 無狀態

  • 有狀態算子

    20190814194604
    • 有中間狀態須要保存

    • 增量查詢

總結
 

6.1. 常規算子

目標和步驟
目標

瞭解 Structured Streaming 的常規數據處理方式

步驟
  1. 案例

案例
  • 需求

    • 給定電影評分數據集 ratings.dat, 位置在 Spark/Files/Dataset/Ratings/ratings.dat

    • 篩選評分超過三分的電影

    • 以追加模式展現數據, 以流的方式來一批數據處理一批數據, 最終每一批次展現爲以下效果

    +------+-------+
    |Rating|MovieID|
    +------+-------+
    |     5|   1193|
    |     4|   3408|
    +------+-------+
  • 步驟

    1. 建立 SparkSession

    2. 讀取並處理數據結構

    3. 處理數據

      1. 選擇要展現的列

      2. 篩選超過三分的數據

    4. 追加模式展現數據到控制檯

  • 代碼

    • 讀取文件的時候只能讀取一個文件夾, 由於是流的操做, 流的場景是源源不斷有新的文件讀取

    val source = spark.readStream
      .textFile("dataset/ratings")
      .map(line => {
        val columns = line.split("::")
        (columns(0).toInt, columns(1).toInt, columns(2).toInt, columns(3).toLong)
      })
      .toDF("UserID", "MovieID", "Rating", "Timestamp")
    
    val result = source.select('Rating, 'MovieID)
        .where('Rating > 3)
總結
  • 針對靜態數據集的不少轉換算子, 均可以應用在流式的 Dataset 上, 例如 MapFlatMapWhereSelect 等

6.2. 分組算子

目標和步驟
目標

可以使用分組完成常見需求, 並瞭解如何擴展行

步驟
  1. 案例

案例
  • 需求

    • 給定電影數據集 movies.dat, 其中三列 MovieIDTitleGenres

    • 統計每一個分類下的電影數量

  • 步驟

    1. 建立 SparkSession

    2. 讀取數據集, 並組織結構

      注意 Genres 是 genres1|genres2 形式, 須要分解爲數組

    3. 使用 explode 函數將數組形式的分類變爲單值多條形式

    4. 分組聚合 Genres

    5. 輸出結果

  • 代碼

    val source = spark.readStream
      .textFile("dataset/movies")
      .map(line => {
        val columns = line.split("::")
        (columns(0).toInt, columns(1).toString, columns(2).toString.split("\\|"))
      })
      .toDF("MovieID", "Title", "Genres")
    
    val result = source.select(explode('Genres) as 'Genres)
        .groupBy('Genres)
        .agg(count('Genres) as 'Count)
    
    result.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .queryName("genres_count")
      .start()
      .awaitTermination()
總結
  • Structured Streaming 不只支持 groupBy, 還支持 groupByKey

相關文章
相關標籤/搜索