Spark(十四)SparkStreaming的官方文檔

1、SparkCore、SparkSQL和SparkStreaming的相似之處

2、SparkStreaming的運行流程

2.1 圖解說明

 

2.2 文字解說

一、咱們在集羣中的其中一臺機器上提交咱們的Application Jar,而後就會產生一個Application,開啓一個Driver,而後初始化SparkStreaming的程序入口StreamingContext;apache

二、Master會爲這個Application的運行分配資源,在集羣中的一臺或者多臺Worker上面開啓Excuter,executer會向Driver註冊;服務器

三、Driver服務器會發送多個receiver給開啓的excuter,(receiver是一個接收器,是用來接收消息的,在excuter裏面運行的時候,其實就至關於一個task任務)socket

四、receiver接收到數據後,每隔200ms就生成一個block塊,就是一個rdd的分區,而後這些block塊就存儲在executer裏面,block塊的存儲級別是Memory_And_Disk_2;oop

五、receiver產生了這些block塊後會把這些block塊的信息發送給StreamingContext;this

六、StreamingContext接收到這些數據後,會根據必定的規則將這些產生的block塊定義成一個rdd;spa

3、SparkStreaming的3個組成部分

4、 離散流(DStream)

5、小例子

5.1 簡單的單詞計數

Scala代碼3d

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object NetWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sparkContext = new SparkContext(conf)
    val sc = new StreamingContext(sparkContext,Seconds(2))
    /**
      * 數據的輸入
      * */
    val inDStream: ReceiverInputDStream[String] = sc.socketTextStream("bigdata",9999)
    inDStream.print()
    /**
      * 數據的處理
      * */
    val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
    /**
      * 數據的輸出
      * */
    resultDStream.print()

    /**
      *啓動應用程序
      * */
    sc.start()
    sc.awaitTermination()
    sc.stop()
  }
}

在Linux上執行如下命令code

運行結果blog

5.2 監控HDFS上的一個目錄

HDFS上的目錄須要先建立hadoop

Scala代碼

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HDFSWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val sc = new StreamingContext(conf,Seconds(2))

    val inDStream: DStream[String] = sc.textFileStream("hdfs://hadoop1:9000/streaming")
    val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
    resultDStream.print()
    
    sc.start()
    sc.awaitTermination()
    sc.stop()
  }
}

Linux上的命令

student.txt

95002,劉晨,女,19,IS
95017,王風娟,女,18,IS
95018,王一,女,19,IS
95013,馮偉,男,21,CS
95014,王小麗,女,19,CS
95019,邢小麗,女,19,IS

運行結果,默認展現的10條

5.3 第二次運行的時候更新原先的結果

Scala代碼

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UpdateWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    System.setProperty("HADOOP_USER_NAME","hadoop")
    val sparkContext = new SparkContext(conf)

    val sc = new StreamingContext(sparkContext,Seconds(2))

    sc.checkpoint("hdfs://hadoop1:9000/streaming")
    val inDStream: ReceiverInputDStream[String] = sc.socketTextStream("hadoop1",9999)

    val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(","))
      .map((_, 1))
      .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
        val currentCount: Int = values.sum
        val lastCount: Int = state.getOrElse(0)
        Some(currentCount + lastCount)
      })
    resultDStream.print()

    sc.start()
    sc.awaitTermination()
    sc.stop()
  }
}

Linux運行命令

運行結果

5.4 DriverHA

5.3的代碼一直運行,結果能夠一直累加,可是代碼一旦中止運行,再次運行時,結果會不會接着上一次進行計算,上一次的計算結果丟失了,主要緣由上每次程序運行都會初始化一個程序入口,而2次運行的程序入口不是同一個入口,因此會致使第一次計算的結果丟失,第一次的運算結果狀態保存在Driver裏面,因此咱們若是想用上一次的計算結果,咱們須要將上一次的Driver裏面的運行結果狀態取出來,而5.3裏面的代碼有一個checkpoint方法,它會把上一次Driver裏面的運算結果狀態保存在checkpoint的目錄裏面,咱們在第二次啓動程序時,從checkpoint裏面取出上一次的運行結果狀態,把此次的Driver狀態恢復成和上一次Driver同樣的狀態

相關文章
相關標籤/搜索