Spark Streaming基礎

1、Spark Streaming概述:
 
是基於Spark core的API,不須要單獨安裝,一盞式解決
 
可擴展、高吞吐量、容錯性、可以運行在多節點、結合了批處理、機器學習、圖計算等
 
將不一樣的數據源的數據通過Spark Streaming處理後輸出到外部文件系統
 
 
1. 應用場景:
 
實時交易防欺詐檢測、傳感器異常實時反應
 
整理Spark發展史問題(缺乏)
 
 
2. Spark Streaming工做原理:
 
粗粒度:
把實時數據流,以秒數拆分紅批次的小數據塊,經過Spark 當成RDD來處理
 
細粒度:
 
 
 
3. 核心概念:
 
編程入口: StreamingContext
 
經常使用構造方法源碼:
 
def this(sparkContext: SparkContext, batchDuration: Duration) = {
  this(sparkContext, null, batchDuration)
}
 
def this(conf: SparkConf, batchDuration: Duration) = {
  this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
 
batchDuration 是必須填的,根據應用程序的延遲需求和資源可用狀況來設置
 
定義好streamingContext後,再定義DStream、transformation等,經過start()開始,stop()結束
 
注意:
一個context啓動後,不能再運行新的streaming(一個JVM只能有一個streamingContext)
 
一旦中止後,就沒辦法再從新開始
 
Stop方法默認把sparkContext和streamingContext同時關掉,要不想關掉sc,必須定義stopSparkContext參數爲false
 
一個SparkContext可以建立多個StreamingContext
 
 
 最基礎的抽象:Discretized Stream  (DStream)
 
一系列的RDD表明一個DStream,是不可變的、分佈式的dataset
 
 
每個RDD表明一個時間段(批次)的數據
 
對DStream進行操做算子(flatMap)時,在底層上看就是對每個RDD作相同的操做,交由Spark core運行
 
 
 數據輸入:Input DStreams and Receivers
 
每個Input DStream 關聯着一個Receiver(但從文件系統接收不須要receiver),receiver 接收數據並存在內存中
 
receiver須要佔用一個線程,因此不能定義local[1],線程的數量n必須大於receivers的數量
 
 
轉換:Transformations on DStreams
 
與RDD相似:map、flatMap、filter、repartition、count...
 
 
 數據輸出:Output Operations and DStreams:
 
輸出到數據庫或者文件系統:
 
API:print、save、foreach
 
 
 
2、Spark Streaming實戰部分:
 
  1. Spark Streaming處理socket數據:
 
接收到的數據進行WordCount操做:
 
在IDEA中:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* Spark Streaming 處理Socket數據
* */
object NetWorkWordCount {
  def main(args: Array[String]): Unit = {
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount")
 
    //建立streamingContext的兩個參數sparkConf和seconds
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //生成Input DStream
    val lines = ssc.socketTextStream("localhost", 6789)
 
    val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
 
    result.print()
 
    ssc.start()
    ssc.awaitTermination()
 
 
  }
}

 

 
在控制檯中:
 
nc -lk 6789,建立一個Socket
 
在這上面輸入數據,就能夠在IDEA中count出來了
 
 
注意:
在執行過程當中會報錯,必須在Maven projects中找出報錯提示中所缺乏的包,而且在dependency上加入。
當projects中尚未的包,在 http://mvnrepository.com 上搜索相應的dependency,而後讓Maven幫咱們自動下載。
 
 
 
 
  1. Spark Streaming處理HDFS中的數據:
 
ssc.textFileStream("file_path")
 
一樣是像上面同樣,只是改了stream的source
 
可是測試時,必需要是生成新的文件(官網稱爲moving進去的文件),纔會被統計;而往舊的文件裏再添加數據,也不會被統計了
 
 
 
  1. Spark Streaming進階實戰:
 
帶狀態的算子UpdateStateByKey、保存到MySQL、window函數
 
 
UpdateStateByKey實現實時更新:
 
容許把新舊狀態結合,連續地更新
 
準備工做:
  1. 定義一個狀態
  2. 定義狀態更新的方法
 
注意:
 
  1. updateFunction須要隱式轉換
  2. 報錯:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set.
            意思就是要進行 checkpoint記錄
 
 
實現代碼:
 
把reduceByKey刪除,而且把map以後的RDD定義爲一個state,配合這個state寫狀態更新方法
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* Spark Streaming有狀態的統計
* */
object StatefulWordCount {
  def main(args: Array[String]): Unit = {
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //使用狀態算子必需要設置checkpoint
    //通常要保存記錄在HDFS中
    ssc.checkpoint(".")
 
    val lines = ssc.socketTextStream("localhost", 6789)
    val result = lines.flatMap(_.split(" ")).map((_, 1))  //不能用reduceByKey
 
    //連續更新狀態
    val state = result.updateStateByKey(updateFunction _)  //須要隱式轉換
    state.print()
 
    ssc.start()
    ssc.awaitTermination()
  }
 
  /*
  * 狀態更新方法更新已有的數據,放在updateStateByKey中
  * */
  def updateFunction(currData: Seq[Int], prevData: Option[Int]): Option[Int] = {
 
    val curr = currData.sum  //算出當前的總次數
    val prev = prevData.getOrElse(0)  //讀取已有的
 
    //返回已有和當前的和
    Some(curr + prev)
  }
}

 

 
 
  1. 統計結果寫到MySQL中:
 
前提準備:
須要在IDEA中增長mysql的connector依賴
在mysql數據庫中先建立一張表
寫jdbc建立鏈接到Mysql
 
 
使用foreachRDD,有不少種 錯誤的寫法:(沒有序列化,建立太多mysql鏈接等)
 
報錯沒有序列化:
 
dstream.foreachRDD {rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach {record =>connection.send(record) // executed at the worker
}
}
 
花太多開銷在鏈接和斷開數據庫上
 
dstream.foreachRDD {rdd =>
rdd.foreach {record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
 
 
官方正確寫法:
 
使用foreachPartition進行優化鏈接:
 
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
val connection = createNewConnection()  //建立mysql鏈接
partitionOfRecords.foreach(record =>
connection.send(record))
connection.close()
}
}
 
用鏈接池進行進一步優化:
 
Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
 
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record =>connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
 
在寫入MySQL數據時,應該做一個是否存在的判斷:
若存在則使用update語句,不存在則使用insert語句
 
 
 
  1. Window的使用:
 
 
兩個參數:
window length:窗口長度
sliding interval:窗口間隔
 
也就是每隔sliding interval統計前window length的值
 
API:countByWindow、reduceByWindow…
 
 
 
  1. 實戰:黑名單過濾
 
transform算子的使用+Spark Streaming整合RDD操做
 
元組默認從1開始數
 
假設輸入數據爲id, name 這種形式
 
實現過程:
  1. 創建黑名單元組 => (name, true)
  2. 把輸入數據流編程元組 => (name, (id, name))
  3. transform,把每一個DStream變成一個個RDD操做
  4. 數據流的RDD與黑名單RDD進行leftjoin,得到新的元組
  5. filter判斷過濾
  6. 整合輸出
 
實現代碼:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 黑名單過濾demo
* */
object TransformApp {
  def main(args: Array[String]): Unit = {
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformApp")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //構建黑名單列表, 實際應用中可在外面讀取列表, 並轉成RDD, 用true標記爲是黑名單元組(name, true)
    val blacks = List("zs", "ls")
    val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
 
    //獲取每行
    val lines = ssc.socketTextStream("localhost", 6789)
    //把id, name => 元組(name, (id, name))
    //transform 的使用,對stream的每一個RDD操做
    val filterResult = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
      //與黑名單進行leftjoin => (name, ((id, name), true)), 並過濾出是true的項
      rdd.leftOuterJoin(blacksRDD)
        .filter(x => x._2._2.getOrElse(false) != true)   //過濾出不等於true的
        .map(x => x._2._1)
    })
    filterResult.print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 
 
  1. Spark Streaming整合Spark SQL
 
整合完成詞頻統計操做
 
官網代碼:
 
 
就是foreachRDD把streaming轉成RDD,而後toDF就能夠進行DataFrame或者是sql的操做了
相關文章
相關標籤/搜索